#!/usr/bin/env python3 """ AutoJanet Dispatcher Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by name, resolves the kanban view and all 5 standard bucket IDs by name, then: 1. Sweeps done=True tasks into the Done bucket. 2. Watchdog: moves stale InProgress tasks (dead job) back to Todo. 3. Review orchestration: spawns a PM agent for each task in Review that needs sub-tasks created (code-review, security-review, etc.). 4. Claims tasks from the Todo bucket and spawns agent Jobs. Config (env vars): OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID VIKUNJA_BASE_URL (default: http://vikunja.vikunja.svc.cluster.local:3456) VIKUNJA_PROJECT_NAME (default: Autonomous Agent Platform) K8S_NAMESPACE (default: autojanet) AGENT_IMAGE """ import logging import os import re import sys import httpx from kubernetes import client as k8s_client, config as k8s_config logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout, ) log = logging.getLogger("dispatcher") # ── Config ──────────────────────────────────────────────────────────────────── OPENBAO_ADDR = os.environ["OPENBAO_ADDR"] OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"] OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"] VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "http://vikunja.vikunja.svc.cluster.local:3456") VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent Platform") K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet") AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest") # Max concurrent jobs per role (and global across all roles) MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2")) MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10")) MAX_TASK_RETRIES = int(os.environ.get("MAX_TASK_RETRIES", "3")) # Standard bucket names (case-insensitive match) BUCKET_BACKLOG = "backlog" BUCKET_TODO = "todo" BUCKET_IN_PROGRESS = "in progress" BUCKET_IN_REVIEW = "review" BUCKET_DONE = "done" VALID_ROLES = { "pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops", "sre", "kubernetes-pilot", "linux-admin", "systems-engineer", "networking", "dba", "prometheus-expert", "tofu-engineer", "release-manager", "doc-updater", "doc-writer", "technical-writer", "cost-optimizer", } # ── OpenBao ─────────────────────────────────────────────────────────────────── def get_openbao_token() -> str: resp = httpx.post( f"{OPENBAO_ADDR}/v1/auth/approle/login", json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID}, timeout=10, ) resp.raise_for_status() return resp.json()["auth"]["client_token"] def get_secret(bao_token: str, path: str, key: str) -> str: resp = httpx.get( f"{OPENBAO_ADDR}/v1/secret/data/{path}", headers={"X-Vault-Token": bao_token}, timeout=10, ) resp.raise_for_status() return resp.json()["data"]["data"][key] def get_vikunja_token(bao_token: str) -> str: return get_secret(bao_token, "autojanet/pm/vikunja-token", "token") # ── Vikunja discovery ───────────────────────────────────────────────────────── def vikunja_get(vikunja_token: str, path: str, **params) -> list | dict: resp = httpx.get( f"{VIKUNJA_BASE_URL}/api/v1/{path}", headers={"Authorization": f"Bearer {vikunja_token}"}, params=params or None, timeout=15, ) resp.raise_for_status() return resp.json() def find_project_id(vikunja_token: str, project_name: str) -> int: """Find project ID by exact name match.""" page = 1 while True: projects = vikunja_get(vikunja_token, "projects", page=page, per_page=50) if not projects: break for p in projects: if p.get("title", "").strip().lower() == project_name.strip().lower(): log.info("Found project '%s' id=%d", project_name, p["id"]) return p["id"] if len(projects) < 50: break page += 1 raise RuntimeError(f"Project '{project_name}' not found in Vikunja") def find_kanban_view_id(vikunja_token: str, project_id: int) -> int: """Find the kanban view for a project.""" views = vikunja_get(vikunja_token, f"projects/{project_id}/views") for v in views: if v.get("view_kind") == "kanban": log.info("Found kanban view id=%d", v["id"]) return v["id"] raise RuntimeError(f"No kanban view found for project {project_id}") def discover_buckets(vikunja_token: str, project_id: int, view_id: int) -> dict[str, int]: """ Return a dict mapping normalised bucket name -> bucket id. Standard names: backlog, todo, in progress, in review, done """ buckets = vikunja_get(vikunja_token, f"projects/{project_id}/views/{view_id}/buckets") mapping = {} for b in buckets: name = b.get("title", "").strip().lower() mapping[name] = b["id"] log.info("Bucket '%s' id=%d", b["title"], b["id"]) return mapping def vikunja_post(vikunja_token: str, path: str, body: dict) -> dict: resp = httpx.post( f"{VIKUNJA_BASE_URL}/api/v1/{path}", headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"}, json=body, timeout=15, ) resp.raise_for_status() return resp.json() def list_tasks_in_bucket(vikunja_token: str, project_id: int, bucket_id: int) -> list[dict]: """Return all undone tasks in a specific bucket using the filter API.""" tasks = [] page = 1 while True: batch = vikunja_get( vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50, filter=f"bucket_id = {bucket_id}", ) if not batch: break tasks.extend(batch) if len(batch) < 50: break page += 1 return [t for t in tasks if not t.get("done")] def list_todo_tasks(vikunja_token: str, project_id: int, todo_id: int) -> list[dict]: """Return all undone tasks in the Todo bucket that have agent labels.""" return [ t for t in list_tasks_in_bucket(vikunja_token, project_id, todo_id) if t.get("labels") ] def claim_task(vikunja_token: str, task_id: int, in_progress_id: int) -> bool: """Move task from Todo → In Progress.""" try: vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": in_progress_id}) log.info("Moved task %d → In Progress (bucket %d)", task_id, in_progress_id) return True except Exception as e: log.error("Failed to claim task %d: %s", task_id, e) return False def unclaim_task(vikunja_token: str, task_id: int, todo_id: int) -> None: """Move task back to Todo on job spawn failure.""" try: vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": todo_id}) log.info("Unclaimed task %d → Todo", task_id) except Exception as e: log.warning("Failed to unclaim task %d: %s", task_id, e) def sweep_done_tasks( vikunja_token: str, project_id: int, view_id: int, done_bucket_id: int, ) -> None: """Find tasks marked done=True that aren't in the Done bucket and move them.""" tasks = [] page = 1 while True: batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50) if not batch: break tasks.extend(batch) if len(batch) < 50: break page += 1 moved = 0 for task in tasks: if task.get("done"): task_id = task["id"] try: vikunja_post( vikunja_token, f"projects/{project_id}/views/{view_id}/buckets/{done_bucket_id}/tasks", {"task_id": task_id}, ) log.info("Swept done task %d → Done bucket", task_id) moved += 1 except Exception as e: log.warning("Failed to sweep task %d to Done: %s", task_id, e) log.info("Done sweep: moved %d tasks", moved) # ── Role extraction ─────────────────────────────────────────────────────────── def extract_agent_role(task: dict) -> str | None: """Return the agent role from an agent: label, or None.""" for label in task.get("labels") or []: title = label.get("title", "") if title.startswith("agent:"): role = title[len("agent:"):] if role in VALID_ROLES: return role return None # ── Stale watchdog ──────────────────────────────────────────────────────────── def count_jobs_for_task(batch_v1: k8s_client.BatchV1Api, task_id: int) -> tuple[int, int]: """ Return (active_count, total_ever) for all jobs with this task_id label. active = job has no terminal outcome yet (still running or pending). total_ever = ALL jobs ever spawned for this task, regardless of outcome. Used to detect both dead-job stale and successful-but-looping scenarios. """ jobs = batch_v1.list_namespaced_job( namespace=K8S_NAMESPACE, label_selector=f"autojanet/task-id={task_id}", _request_timeout=15, ) active = 0 total_ever = len(jobs.items) for job in jobs.items: s = job.status is_succeeded = (s.succeeded or 0) > 0 is_failed = (s.failed or 0) >= (job.spec.backoff_limit or 1) + 1 if not is_succeeded and not is_failed: active += 1 return active, total_ever def post_stale_comment(vikunja_token: str, task_id: int, reason: str) -> None: try: httpx.put( f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/comments", headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"}, json={"comment": reason}, timeout=10, ).raise_for_status() except Exception as e: log.warning("Failed to post stale comment on task %d: %s", task_id, e) def watchdog_stale_tasks( vikunja_token: str, batch_v1: k8s_client.BatchV1Api, project_id: int, view_id: int, in_progress_id: int, todo_id: int, backlog_id: int, ) -> None: """ For every task stuck in InProgress, check whether its Job is still alive. - No job found or job terminally failed: - attempt count < MAX_TASK_RETRIES → move back to Todo - attempt count >= MAX_TASK_RETRIES → move to Backlog + comment """ stale = list_tasks_in_bucket(vikunja_token, project_id, in_progress_id) log.info("Watchdog: checking %d InProgress tasks", len(stale)) for task in stale: task_id = task["id"] active, total_ever = count_jobs_for_task(batch_v1, task_id) if active > 0: log.debug("Task %d has %d active job(s), leaving alone", task_id, active) continue # No active job — task is stuck or looping log.warning("Task %d is stale: active=%d total_attempts=%d", task_id, active, total_ever) if total_ever >= MAX_TASK_RETRIES: log.error("Task %d hit retry limit (%d/%d), moving to Backlog", task_id, total_ever, MAX_TASK_RETRIES) try: vikunja_post( vikunja_token, f"projects/{project_id}/views/{view_id}/buckets/{backlog_id}/tasks", {"task_id": task_id}, ) except Exception as e: log.warning("Failed to move task %d to Backlog: %s", task_id, e) post_stale_comment( vikunja_token, task_id, f"🚨 **Watchdog**: task has been attempted {total_ever} time(s) and hit the retry limit " f"(`MAX_TASK_RETRIES={MAX_TASK_RETRIES}`). Moved to **Backlog** for manual review." ) else: log.info("Task %d attempt %d/%d, requeueing to Todo", task_id, total_ever + 1, MAX_TASK_RETRIES) try: vikunja_post( vikunja_token, f"projects/{project_id}/views/{view_id}/buckets/{todo_id}/tasks", {"task_id": task_id}, ) except Exception as e: log.warning("Failed to requeue task %d to Todo: %s", task_id, e) post_stale_comment( vikunja_token, task_id, f"⚠️ **Watchdog**: no active job found for this task (attempt {total_ever + 1}/{MAX_TASK_RETRIES}). " f"Requeued to **Todo** for retry." ) # ── Review orchestration ────────────────────────────────────────────────────── # Roles that should NOT trigger review orchestration (would cause loops) REVIEW_SKIP_ROLES = {"pm", "code-reviewer"} def spawn_review_pm_job( batch_v1: k8s_client.BatchV1Api, task_id: int, task_title: str, in_review_bucket_id: int, project_id: int, view_id: int, ) -> None: """Spawn a PM agent job to orchestrate review sub-tasks for a completed task.""" name = f"review-pm-{task_id}" if job_already_exists(batch_v1, name): log.debug("Review PM job %s already exists, skipping", name) return job = k8s_client.V1Job( api_version="batch/v1", kind="Job", metadata=k8s_client.V1ObjectMeta( name=name, namespace=K8S_NAMESPACE, labels={ "autojanet/type": "review-pm", "autojanet/role": "pm", "autojanet/task-id": str(task_id), }, ), spec=k8s_client.V1JobSpec( ttl_seconds_after_finished=3600, backoff_limit=1, template=k8s_client.V1PodTemplateSpec( metadata=k8s_client.V1ObjectMeta( labels={ "autojanet/type": "review-pm", "autojanet/role": "pm", "autojanet/task-id": str(task_id), } ), spec=k8s_client.V1PodSpec( service_account_name="agent-pm", restart_policy="Never", node_selector={"kubernetes.io/arch": "amd64"}, containers=[ k8s_client.V1Container( name="agent", image=AGENT_IMAGE, image_pull_policy="Always", env=[ k8s_client.V1EnvVar(name="AGENT_ROLE", value="pm"), k8s_client.V1EnvVar(name="TASK_TYPE", value="review_orchestration"), k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)), k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)), k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)), k8s_client.V1EnvVar( name="OPENBAO_ROLE_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name="agent-pm-approle", key="role_id", ) ), ), k8s_client.V1EnvVar( name="OPENBAO_SECRET_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name="agent-pm-approle", key="secret_id", ) ), ), ], resources=k8s_client.V1ResourceRequirements( requests={"cpu": "250m", "memory": "512Mi"}, limits={"cpu": "2000m", "memory": "2Gi"}, ), security_context=k8s_client.V1SecurityContext( allow_privilege_escalation=False, run_as_non_root=True, run_as_user=1000, capabilities=k8s_client.V1Capabilities(drop=["ALL"]), ), ) ], ), ), ), ) log.info("Spawning review PM job %s for task %d", name, task_id) batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30) def orchestrate_review_tasks( vikunja_token: str, batch_v1: k8s_client.BatchV1Api, project_id: int, view_id: int, in_review_id: int, ) -> None: """ Scan the Review bucket. For each task that has a non-pm/non-reviewer agent label and no review-pm job yet, spawn a PM agent to create review sub-tasks. """ review_tasks = list_tasks_in_bucket(vikunja_token, project_id, in_review_id) log.info("Review orchestration: checking %d tasks in Review bucket", len(review_tasks)) for task in review_tasks: task_id = task["id"] role = extract_agent_role(task) if not role or role in REVIEW_SKIP_ROLES: log.debug("Task %d role=%s skipped for review orchestration", task_id, role) continue spawn_review_pm_job( batch_v1, task_id=task_id, task_title=task.get("title", f"Task {task_id}"), in_review_bucket_id=in_review_id, project_id=project_id, view_id=view_id, ) # ── Kubernetes ──────────────────────────────────────────────────────────────── def load_k8s_config() -> None: try: k8s_config.load_incluster_config() except k8s_config.ConfigException: k8s_config.load_kube_config() def count_active_jobs(batch_v1: k8s_client.BatchV1Api) -> tuple[int, dict[str, int]]: """Return (total_active, {role: count}) for all non-completed agent jobs.""" jobs = batch_v1.list_namespaced_job( namespace=K8S_NAMESPACE, label_selector="autojanet/type=agent", _request_timeout=15, ) total = 0 by_role: dict[str, int] = {} for job in jobs.items: # Skip completed/failed jobs status = job.status if (status.succeeded or 0) > 0 or (status.failed or 0) >= (job.spec.backoff_limit or 1) + 1: continue total += 1 role = job.metadata.labels.get("autojanet/role", "unknown") by_role[role] = by_role.get(role, 0) + 1 return total, by_role def job_name(role: str, task_id: int) -> str: safe_role = role.replace("-", "")[:12] return f"agent-{safe_role}-{task_id}" def job_already_exists(batch_v1: k8s_client.BatchV1Api, name: str) -> bool: try: batch_v1.read_namespaced_job(name=name, namespace=K8S_NAMESPACE) return True except k8s_client.ApiException as e: if e.status == 404: return False raise def spawn_agent_job( batch_v1: k8s_client.BatchV1Api, role: str, task_id: int, task_title: str, in_review_bucket_id: int, project_id: int, view_id: int, ) -> None: name = job_name(role, task_id) if job_already_exists(batch_v1, name): log.info("Job %s already exists, skipping", name) return job = k8s_client.V1Job( api_version="batch/v1", kind="Job", metadata=k8s_client.V1ObjectMeta( name=name, namespace=K8S_NAMESPACE, labels={ "autojanet/type": "agent", "autojanet/role": role, "autojanet/task-id": str(task_id), }, ), spec=k8s_client.V1JobSpec( ttl_seconds_after_finished=3600, backoff_limit=1, template=k8s_client.V1PodTemplateSpec( metadata=k8s_client.V1ObjectMeta( labels={ "autojanet/type": "agent", "autojanet/role": role, "autojanet/task-id": str(task_id), } ), spec=k8s_client.V1PodSpec( service_account_name=f"agent-{role}", restart_policy="Never", node_selector={"kubernetes.io/arch": "amd64"}, containers=[ k8s_client.V1Container( name="agent", image=AGENT_IMAGE, image_pull_policy="Always", env=[ k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)), k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)), k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)), k8s_client.V1EnvVar( name="OPENBAO_ROLE_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name=f"agent-{role}-approle", key="role_id", ) ), ), k8s_client.V1EnvVar( name="OPENBAO_SECRET_ID", value_from=k8s_client.V1EnvVarSource( secret_key_ref=k8s_client.V1SecretKeySelector( name=f"agent-{role}-approle", key="secret_id", ) ), ), ], resources=k8s_client.V1ResourceRequirements( requests={"cpu": "250m", "memory": "512Mi"}, limits={"cpu": "2000m", "memory": "2Gi"}, ), security_context=k8s_client.V1SecurityContext( allow_privilege_escalation=False, run_as_non_root=True, run_as_user=1000, capabilities=k8s_client.V1Capabilities(drop=["ALL"]), ), ) ], ), ), ), ) log.info("Creating k8s job %s", name) batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30) log.info("Spawned job %s for role=%s task=%d", name, role, task_id) # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: log.info("Dispatcher starting") # Auth bao_token = get_openbao_token() vikunja_token = get_vikunja_token(bao_token) log.info("Authenticated to OpenBao and Vikunja") # Discover project + kanban view + buckets by name project_id = find_project_id(vikunja_token, VIKUNJA_PROJECT_NAME) view_id = find_kanban_view_id(vikunja_token, project_id) buckets = discover_buckets(vikunja_token, project_id, view_id) todo_id = buckets.get(BUCKET_TODO) in_progress_id = buckets.get(BUCKET_IN_PROGRESS) in_review_id = buckets.get(BUCKET_IN_REVIEW) done_id = buckets.get(BUCKET_DONE) backlog_id = buckets.get(BUCKET_BACKLOG) if not all([todo_id, in_progress_id, in_review_id, done_id, backlog_id]): log.error("Could not find all standard buckets. Found: %s", list(buckets.keys())) sys.exit(1) # k8s load_k8s_config() batch_v1 = k8s_client.BatchV1Api() # 1. Sweep: move any done=True tasks into the Done bucket sweep_done_tasks(vikunja_token, project_id, view_id, done_id) # 2. Watchdog: requeue or escalate stale InProgress tasks watchdog_stale_tasks( vikunja_token, batch_v1, project_id, view_id, in_progress_id, todo_id, backlog_id, ) # 3. Review orchestration: spawn PM jobs for tasks awaiting review orchestrate_review_tasks(vikunja_token, batch_v1, project_id, view_id, in_review_id) # 4. Scan Todo bucket for claimable tasks tasks = list_todo_tasks(vikunja_token, project_id, todo_id) log.info("Found %d candidate tasks in Todo bucket", len(tasks)) # Check current job counts before spawning anything total_active, active_by_role = count_active_jobs(batch_v1) log.info("Active jobs: total=%d limits(per_role=%d, total=%d)", total_active, MAX_JOBS_PER_ROLE, MAX_JOBS_TOTAL) claimed = 0 for task in tasks: task_id = task["id"] title = task.get("title", "") role = extract_agent_role(task) if not role: log.debug("Task %d has no valid agent label, skipping", task_id) continue # Enforce concurrency limits if total_active >= MAX_JOBS_TOTAL: log.info("Global job limit reached (%d/%d), stopping", total_active, MAX_JOBS_TOTAL) break if active_by_role.get(role, 0) >= MAX_JOBS_PER_ROLE: log.info("Role %s at limit (%d/%d), skipping task %d", role, active_by_role.get(role, 0), MAX_JOBS_PER_ROLE, task_id) continue log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) if not claim_task(vikunja_token, task_id, in_progress_id): continue try: spawn_agent_job(batch_v1, role, task_id, title, in_review_id, project_id, view_id) claimed += 1 total_active += 1 active_by_role[role] = active_by_role.get(role, 0) + 1 except Exception as e: log.error("Failed to spawn job for task %d: %s", task_id, e) unclaim_task(vikunja_token, task_id, todo_id) log.info("Dispatcher done. Claimed %d tasks.", claimed) if __name__ == "__main__": main()