refactor: dispatcher discovers project/view/buckets by name, defers bucket moves
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Zoë 2026-05-30 17:43:03 -07:00
parent 69d40646be
commit c5a462607a

View file

@ -2,23 +2,23 @@
""" """
AutoJanet Dispatcher AutoJanet Dispatcher
Runs as a CronJob every 2 minutes. Scans Vikunja project 78 for tasks in the Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by
Todo bucket that have an `agent:<role>` label. Claims each task (moves to name, resolves the kanban view and all 5 standard bucket IDs by name, then
In Progress) and spawns a Kubernetes Job for the appropriate agent. claims tasks from the Todo bucket that have an `agent:<role>` label and
spawns a Kubernetes Job for the appropriate agent.
Requirements: Config (env vars):
- OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID for fetching Vikunja token OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID
- VIKUNJA_BASE_URL, VIKUNJA_PROJECT_ID, VIKUNJA_TODO_BUCKET_ID VIKUNJA_BASE_URL (default: http://vikunja.vikunja.svc.cluster.local:3456)
- K8S_NAMESPACE, AGENT_IMAGE VIKUNJA_PROJECT_NAME (default: Autonomous Agent Platform)
K8S_NAMESPACE (default: autojanet)
AGENT_IMAGE
""" """
import json
import logging import logging
import os import os
import re import re
import sys import sys
import time
from string import Template
import httpx import httpx
from kubernetes import client as k8s_client, config as k8s_config from kubernetes import client as k8s_client, config as k8s_config
@ -32,18 +32,22 @@ log = logging.getLogger("dispatcher")
# ── Config ──────────────────────────────────────────────────────────────────── # ── Config ────────────────────────────────────────────────────────────────────
OPENBAO_ADDR = os.environ["OPENBAO_ADDR"] OPENBAO_ADDR = os.environ["OPENBAO_ADDR"]
OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"] OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"]
OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"] OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"]
VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi") VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "http://vikunja.vikunja.svc.cluster.local:3456")
VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78")) VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent Platform")
VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116"))
VIKUNJA_IN_PROGRESS_BUCKET_ID = int(os.environ.get("VIKUNJA_IN_PROGRESS_BUCKET_ID", "117"))
VIKUNJA_KANBAN_VIEW_ID = int(os.environ.get("VIKUNJA_KANBAN_VIEW_ID", "114"))
K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet") K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet")
AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest") AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest")
# Standard bucket names (case-insensitive match)
BUCKET_BACKLOG = "backlog"
BUCKET_TODO = "todo"
BUCKET_IN_PROGRESS = "in progress"
BUCKET_IN_REVIEW = "in review"
BUCKET_DONE = "done"
VALID_ROLES = { VALID_ROLES = {
"pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops", "pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops",
@ -55,7 +59,6 @@ VALID_ROLES = {
# ── OpenBao ─────────────────────────────────────────────────────────────────── # ── OpenBao ───────────────────────────────────────────────────────────────────
def get_openbao_token() -> str: def get_openbao_token() -> str:
"""Authenticate to OpenBao via AppRole and return a client token."""
resp = httpx.post( resp = httpx.post(
f"{OPENBAO_ADDR}/v1/auth/approle/login", f"{OPENBAO_ADDR}/v1/auth/approle/login",
json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID}, json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID},
@ -66,7 +69,6 @@ def get_openbao_token() -> str:
def get_secret(bao_token: str, path: str, key: str) -> str: def get_secret(bao_token: str, path: str, key: str) -> str:
"""Read a KV v2 secret from OpenBao."""
resp = httpx.get( resp = httpx.get(
f"{OPENBAO_ADDR}/v1/secret/data/{path}", f"{OPENBAO_ADDR}/v1/secret/data/{path}",
headers={"X-Vault-Token": bao_token}, headers={"X-Vault-Token": bao_token},
@ -76,43 +78,80 @@ def get_secret(bao_token: str, path: str, key: str) -> str:
return resp.json()["data"]["data"][key] return resp.json()["data"]["data"][key]
# ── Vikunja ───────────────────────────────────────────────────────────────────
def get_vikunja_token(bao_token: str) -> str: def get_vikunja_token(bao_token: str) -> str:
"""Fetch the dispatcher's Vikunja token from OpenBao."""
return get_secret(bao_token, "autojanet/pm/vikunja-token", "token") return get_secret(bao_token, "autojanet/pm/vikunja-token", "token")
def list_todo_tasks(vikunja_token: str) -> list[dict]: # ── Vikunja discovery ─────────────────────────────────────────────────────────
"""Return all tasks in the Todo bucket of the AutoJanet project."""
def vikunja_get(vikunja_token: str, path: str, **params) -> list | dict:
resp = httpx.get(
f"{VIKUNJA_BASE_URL}/api/v1/{path}",
headers={"Authorization": f"Bearer {vikunja_token}"},
params=params or None,
timeout=15,
)
resp.raise_for_status()
return resp.json()
def find_project_id(vikunja_token: str, project_name: str) -> int:
"""Find project ID by exact name match."""
page = 1
while True:
projects = vikunja_get(vikunja_token, "projects", page=page, per_page=50)
if not projects:
break
for p in projects:
if p.get("title", "").strip().lower() == project_name.strip().lower():
log.info("Found project '%s' id=%d", project_name, p["id"])
return p["id"]
if len(projects) < 50:
break
page += 1
raise RuntimeError(f"Project '{project_name}' not found in Vikunja")
def find_kanban_view_id(vikunja_token: str, project_id: int) -> int:
"""Find the kanban view for a project."""
views = vikunja_get(vikunja_token, f"projects/{project_id}/views")
for v in views:
if v.get("view_kind") == "kanban":
log.info("Found kanban view id=%d", v["id"])
return v["id"]
raise RuntimeError(f"No kanban view found for project {project_id}")
def discover_buckets(vikunja_token: str, project_id: int, view_id: int) -> dict[str, int]:
"""
Return a dict mapping normalised bucket name -> bucket id.
Standard names: backlog, todo, in progress, in review, done
"""
buckets = vikunja_get(vikunja_token, f"projects/{project_id}/views/{view_id}/buckets")
mapping = {}
for b in buckets:
name = b.get("title", "").strip().lower()
mapping[name] = b["id"]
log.info("Bucket '%s' id=%d", b["title"], b["id"])
return mapping
def list_todo_tasks(vikunja_token: str, project_id: int) -> list[dict]:
"""Return all undone tasks with agent labels from the project."""
tasks = [] tasks = []
page = 1 page = 1
while True: while True:
resp = httpx.get( batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50)
f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/tasks",
headers={"Authorization": f"Bearer {vikunja_token}"},
params={"page": page, "per_page": 50},
timeout=15,
)
resp.raise_for_status()
batch = resp.json()
if not batch: if not batch:
break break
tasks.extend(batch) tasks.extend(batch)
if len(batch) < 50: if len(batch) < 50:
break break
page += 1 page += 1
# Tasks in Todo bucket have done=False and agent label.
# Since done tasks are marked done=True, filtering undone tasks
# with an agent label is sufficient.
return [t for t in tasks if not t.get("done") and t.get("labels")] return [t for t in tasks if not t.get("done") and t.get("labels")]
def extract_agent_role(task: dict) -> str | None: def extract_agent_role(task: dict) -> str | None:
"""
Return the role name if the task has exactly one `agent:<role>` label
that matches a known role. Returns None otherwise.
"""
labels = task.get("labels") or [] labels = task.get("labels") or []
roles_found = [] roles_found = []
for label in labels: for label in labels:
@ -122,26 +161,17 @@ def extract_agent_role(task: dict) -> str | None:
role = m.group(1) role = m.group(1)
if role in VALID_ROLES: if role in VALID_ROLES:
roles_found.append(role) roles_found.append(role)
if len(roles_found) == 1: return roles_found[0] if len(roles_found) == 1 else None
return roles_found[0]
return None
def claim_task(vikunja_token: str, task_id: int) -> bool: def claim_task(task_id: int) -> bool:
"""Move task to In Progress bucket. Returns True on success.""" """Placeholder — bucket moving deferred. Always returns True."""
resp = httpx.post( return True
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}",
headers={
"Authorization": f"Bearer {vikunja_token}", def unclaim_task(task_id: int) -> None:
"Content-Type": "application/json", """Placeholder — bucket moving deferred."""
}, pass
json={"bucket_id": VIKUNJA_IN_PROGRESS_BUCKET_ID},
timeout=10,
)
if resp.status_code in (200, 201):
return True
log.warning("Failed to claim task %d: %d %s", task_id, resp.status_code, resp.text)
return False
# ── Kubernetes ──────────────────────────────────────────────────────────────── # ── Kubernetes ────────────────────────────────────────────────────────────────
@ -212,13 +242,13 @@ def spawn_agent_job(
image=AGENT_IMAGE, image=AGENT_IMAGE,
image_pull_policy="Always", image_pull_policy="Always",
env=[ env=[
k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), k8s_client.V1EnvVar(name="AGENT_ROLE", value=role),
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL",value=VIKUNJA_BASE_URL),
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), k8s_client.V1EnvVar(name="LITELLM_BASE_URL",value="https://llm.ctz.fyi"),
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), k8s_client.V1EnvVar(name="FORGEJO_BASE_URL",value="https://git.ctz.fyi"),
k8s_client.V1EnvVar( k8s_client.V1EnvVar(
name="OPENBAO_ROLE_ID", name="OPENBAO_ROLE_ID",
value_from=k8s_client.V1EnvVarSource( value_from=k8s_client.V1EnvVarSource(
@ -265,30 +295,41 @@ def main() -> None:
log.info("Dispatcher starting") log.info("Dispatcher starting")
# Auth # Auth
bao_token = get_openbao_token() bao_token = get_openbao_token()
vikunja_token = get_vikunja_token(bao_token) vikunja_token = get_vikunja_token(bao_token)
log.info("Authenticated to OpenBao and Vikunja") log.info("Authenticated to OpenBao and Vikunja")
# Discover project + kanban view + buckets by name
project_id = find_project_id(vikunja_token, VIKUNJA_PROJECT_NAME)
view_id = find_kanban_view_id(vikunja_token, project_id)
buckets = discover_buckets(vikunja_token, project_id, view_id)
todo_id = buckets.get(BUCKET_TODO)
in_progress_id = buckets.get(BUCKET_IN_PROGRESS)
if not todo_id or not in_progress_id:
log.warning("Could not find all standard buckets. Found: %s", list(buckets.keys()))
# k8s # k8s
load_k8s_config() load_k8s_config()
batch_v1 = k8s_client.BatchV1Api() batch_v1 = k8s_client.BatchV1Api()
# Scan tasks # Scan + claim tasks
tasks = list_todo_tasks(vikunja_token) tasks = list_todo_tasks(vikunja_token, project_id)
log.info("Found %d tasks in Todo bucket", len(tasks)) log.info("Found %d candidate tasks", len(tasks))
claimed = 0 claimed = 0
for task in tasks: for task in tasks:
task_id = task["id"] task_id = task["id"]
title = task.get("title", "") title = task.get("title", "")
role = extract_agent_role(task) role = extract_agent_role(task)
if not role: if not role:
log.debug("Task %d has no valid agent label, skipping", task_id) log.debug("Task %d has no valid agent label, skipping", task_id)
continue continue
log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role)
if not claim_task(vikunja_token, task_id): if not claim_task(task_id):
continue continue
try: try:
@ -296,13 +337,7 @@ def main() -> None:
claimed += 1 claimed += 1
except Exception as e: except Exception as e:
log.error("Failed to spawn job for task %d: %s", task_id, e) log.error("Failed to spawn job for task %d: %s", task_id, e)
# Un-claim: move back to Todo unclaim_task(task_id)
httpx.post(
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json={"bucket_id": VIKUNJA_TODO_BUCKET_ID},
timeout=10,
)
log.info("Dispatcher done. Claimed %d tasks.", claimed) log.info("Dispatcher done. Claimed %d tasks.", claimed)