diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index e95f2c4..5985076 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -2,23 +2,23 @@ """ AutoJanet Dispatcher -Runs as a CronJob every 2 minutes. Scans Vikunja project 78 for tasks in the -Todo bucket that have an `agent:` label. Claims each task (moves to -In Progress) and spawns a Kubernetes Job for the appropriate agent. +Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by +name, resolves the kanban view and all 5 standard bucket IDs by name, then +claims tasks from the Todo bucket that have an `agent:` label and +spawns a Kubernetes Job for the appropriate agent. -Requirements: - - OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID — for fetching Vikunja token - - VIKUNJA_BASE_URL, VIKUNJA_PROJECT_ID, VIKUNJA_TODO_BUCKET_ID - - K8S_NAMESPACE, AGENT_IMAGE +Config (env vars): + OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID + VIKUNJA_BASE_URL (default: http://vikunja.vikunja.svc.cluster.local:3456) + VIKUNJA_PROJECT_NAME (default: Autonomous Agent Platform) + K8S_NAMESPACE (default: autojanet) + AGENT_IMAGE """ -import json import logging import os import re import sys -import time -from string import Template import httpx from kubernetes import client as k8s_client, config as k8s_config @@ -32,18 +32,22 @@ log = logging.getLogger("dispatcher") # ── Config ──────────────────────────────────────────────────────────────────── -OPENBAO_ADDR = os.environ["OPENBAO_ADDR"] -OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"] +OPENBAO_ADDR = os.environ["OPENBAO_ADDR"] +OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"] OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"] -VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi") -VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78")) -VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116")) -VIKUNJA_IN_PROGRESS_BUCKET_ID = int(os.environ.get("VIKUNJA_IN_PROGRESS_BUCKET_ID", "117")) -VIKUNJA_KANBAN_VIEW_ID = int(os.environ.get("VIKUNJA_KANBAN_VIEW_ID", "114")) +VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "http://vikunja.vikunja.svc.cluster.local:3456") +VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent Platform") K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet") -AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest") +AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest") + +# Standard bucket names (case-insensitive match) +BUCKET_BACKLOG = "backlog" +BUCKET_TODO = "todo" +BUCKET_IN_PROGRESS = "in progress" +BUCKET_IN_REVIEW = "in review" +BUCKET_DONE = "done" VALID_ROLES = { "pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops", @@ -55,7 +59,6 @@ VALID_ROLES = { # ── OpenBao ─────────────────────────────────────────────────────────────────── def get_openbao_token() -> str: - """Authenticate to OpenBao via AppRole and return a client token.""" resp = httpx.post( f"{OPENBAO_ADDR}/v1/auth/approle/login", json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID}, @@ -66,7 +69,6 @@ def get_openbao_token() -> str: def get_secret(bao_token: str, path: str, key: str) -> str: - """Read a KV v2 secret from OpenBao.""" resp = httpx.get( f"{OPENBAO_ADDR}/v1/secret/data/{path}", headers={"X-Vault-Token": bao_token}, @@ -76,43 +78,80 @@ def get_secret(bao_token: str, path: str, key: str) -> str: return resp.json()["data"]["data"][key] -# ── Vikunja ─────────────────────────────────────────────────────────────────── - def get_vikunja_token(bao_token: str) -> str: - """Fetch the dispatcher's Vikunja token from OpenBao.""" return get_secret(bao_token, "autojanet/pm/vikunja-token", "token") -def list_todo_tasks(vikunja_token: str) -> list[dict]: - """Return all tasks in the Todo bucket of the AutoJanet project.""" +# ── Vikunja discovery ───────────────────────────────────────────────────────── + +def vikunja_get(vikunja_token: str, path: str, **params) -> list | dict: + resp = httpx.get( + f"{VIKUNJA_BASE_URL}/api/v1/{path}", + headers={"Authorization": f"Bearer {vikunja_token}"}, + params=params or None, + timeout=15, + ) + resp.raise_for_status() + return resp.json() + + +def find_project_id(vikunja_token: str, project_name: str) -> int: + """Find project ID by exact name match.""" + page = 1 + while True: + projects = vikunja_get(vikunja_token, "projects", page=page, per_page=50) + if not projects: + break + for p in projects: + if p.get("title", "").strip().lower() == project_name.strip().lower(): + log.info("Found project '%s' id=%d", project_name, p["id"]) + return p["id"] + if len(projects) < 50: + break + page += 1 + raise RuntimeError(f"Project '{project_name}' not found in Vikunja") + + +def find_kanban_view_id(vikunja_token: str, project_id: int) -> int: + """Find the kanban view for a project.""" + views = vikunja_get(vikunja_token, f"projects/{project_id}/views") + for v in views: + if v.get("view_kind") == "kanban": + log.info("Found kanban view id=%d", v["id"]) + return v["id"] + raise RuntimeError(f"No kanban view found for project {project_id}") + + +def discover_buckets(vikunja_token: str, project_id: int, view_id: int) -> dict[str, int]: + """ + Return a dict mapping normalised bucket name -> bucket id. + Standard names: backlog, todo, in progress, in review, done + """ + buckets = vikunja_get(vikunja_token, f"projects/{project_id}/views/{view_id}/buckets") + mapping = {} + for b in buckets: + name = b.get("title", "").strip().lower() + mapping[name] = b["id"] + log.info("Bucket '%s' id=%d", b["title"], b["id"]) + return mapping + + +def list_todo_tasks(vikunja_token: str, project_id: int) -> list[dict]: + """Return all undone tasks with agent labels from the project.""" tasks = [] page = 1 while True: - resp = httpx.get( - f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/tasks", - headers={"Authorization": f"Bearer {vikunja_token}"}, - params={"page": page, "per_page": 50}, - timeout=15, - ) - resp.raise_for_status() - batch = resp.json() + batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50) if not batch: break tasks.extend(batch) if len(batch) < 50: break page += 1 - # Tasks in Todo bucket have done=False and agent label. - # Since done tasks are marked done=True, filtering undone tasks - # with an agent label is sufficient. return [t for t in tasks if not t.get("done") and t.get("labels")] def extract_agent_role(task: dict) -> str | None: - """ - Return the role name if the task has exactly one `agent:` label - that matches a known role. Returns None otherwise. - """ labels = task.get("labels") or [] roles_found = [] for label in labels: @@ -122,26 +161,17 @@ def extract_agent_role(task: dict) -> str | None: role = m.group(1) if role in VALID_ROLES: roles_found.append(role) - if len(roles_found) == 1: - return roles_found[0] - return None + return roles_found[0] if len(roles_found) == 1 else None -def claim_task(vikunja_token: str, task_id: int) -> bool: - """Move task to In Progress bucket. Returns True on success.""" - resp = httpx.post( - f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}", - headers={ - "Authorization": f"Bearer {vikunja_token}", - "Content-Type": "application/json", - }, - json={"bucket_id": VIKUNJA_IN_PROGRESS_BUCKET_ID}, - timeout=10, - ) - if resp.status_code in (200, 201): - return True - log.warning("Failed to claim task %d: %d %s", task_id, resp.status_code, resp.text) - return False +def claim_task(task_id: int) -> bool: + """Placeholder — bucket moving deferred. Always returns True.""" + return True + + +def unclaim_task(task_id: int) -> None: + """Placeholder — bucket moving deferred.""" + pass # ── Kubernetes ──────────────────────────────────────────────────────────────── @@ -212,13 +242,13 @@ def spawn_agent_job( image=AGENT_IMAGE, image_pull_policy="Always", env=[ - k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), - k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), - k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), - k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), - k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), - k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), - k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), + k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), + k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), + k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), + k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), + k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL",value=VIKUNJA_BASE_URL), + k8s_client.V1EnvVar(name="LITELLM_BASE_URL",value="https://llm.ctz.fyi"), + k8s_client.V1EnvVar(name="FORGEJO_BASE_URL",value="https://git.ctz.fyi"), k8s_client.V1EnvVar( name="OPENBAO_ROLE_ID", value_from=k8s_client.V1EnvVarSource( @@ -265,30 +295,41 @@ def main() -> None: log.info("Dispatcher starting") # Auth - bao_token = get_openbao_token() - vikunja_token = get_vikunja_token(bao_token) + bao_token = get_openbao_token() + vikunja_token = get_vikunja_token(bao_token) log.info("Authenticated to OpenBao and Vikunja") + # Discover project + kanban view + buckets by name + project_id = find_project_id(vikunja_token, VIKUNJA_PROJECT_NAME) + view_id = find_kanban_view_id(vikunja_token, project_id) + buckets = discover_buckets(vikunja_token, project_id, view_id) + + todo_id = buckets.get(BUCKET_TODO) + in_progress_id = buckets.get(BUCKET_IN_PROGRESS) + + if not todo_id or not in_progress_id: + log.warning("Could not find all standard buckets. Found: %s", list(buckets.keys())) + # k8s load_k8s_config() batch_v1 = k8s_client.BatchV1Api() - # Scan tasks - tasks = list_todo_tasks(vikunja_token) - log.info("Found %d tasks in Todo bucket", len(tasks)) + # Scan + claim tasks + tasks = list_todo_tasks(vikunja_token, project_id) + log.info("Found %d candidate tasks", len(tasks)) claimed = 0 for task in tasks: task_id = task["id"] - title = task.get("title", "") - role = extract_agent_role(task) + title = task.get("title", "") + role = extract_agent_role(task) if not role: log.debug("Task %d has no valid agent label, skipping", task_id) continue log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) - if not claim_task(vikunja_token, task_id): + if not claim_task(task_id): continue try: @@ -296,13 +337,7 @@ def main() -> None: claimed += 1 except Exception as e: log.error("Failed to spawn job for task %d: %s", task_id, e) - # Un-claim: move back to Todo - httpx.post( - f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}", - headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"}, - json={"bucket_id": VIKUNJA_TODO_BUCKET_ID}, - timeout=10, - ) + unclaim_task(task_id) log.info("Dispatcher done. Claimed %d tasks.", claimed)