All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
/projects/{id}/tasks returns bucket_id=0 for all tasks.
Use /projects/{p}/views/{v}/buckets/{b}/tasks instead.
308 lines
12 KiB
Python
308 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AutoJanet Dispatcher
|
|
|
|
Runs as a CronJob every 2 minutes. Scans Vikunja project 78 for tasks in the
|
|
Todo bucket that have an `agent:<role>` label. Claims each task (moves to
|
|
In Progress) and spawns a Kubernetes Job for the appropriate agent.
|
|
|
|
Requirements:
|
|
- OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID — for fetching Vikunja token
|
|
- VIKUNJA_BASE_URL, VIKUNJA_PROJECT_ID, VIKUNJA_TODO_BUCKET_ID
|
|
- K8S_NAMESPACE, AGENT_IMAGE
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from string import Template
|
|
|
|
import httpx
|
|
from kubernetes import client as k8s_client, config as k8s_config
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
log = logging.getLogger("dispatcher")
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
OPENBAO_ADDR = os.environ["OPENBAO_ADDR"]
|
|
OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"]
|
|
OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"]
|
|
|
|
VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi")
|
|
VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78"))
|
|
VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116"))
|
|
VIKUNJA_IN_PROGRESS_BUCKET_ID = int(os.environ.get("VIKUNJA_IN_PROGRESS_BUCKET_ID", "117"))
|
|
VIKUNJA_KANBAN_VIEW_ID = int(os.environ.get("VIKUNJA_KANBAN_VIEW_ID", "114"))
|
|
|
|
K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet")
|
|
AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest")
|
|
|
|
VALID_ROLES = {
|
|
"pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops",
|
|
"sre", "kubernetes-pilot", "linux-admin", "systems-engineer", "networking",
|
|
"dba", "prometheus-expert", "tofu-engineer", "release-manager",
|
|
"doc-updater", "doc-writer", "technical-writer", "cost-optimizer",
|
|
}
|
|
|
|
# ── OpenBao ───────────────────────────────────────────────────────────────────
|
|
|
|
def get_openbao_token() -> str:
|
|
"""Authenticate to OpenBao via AppRole and return a client token."""
|
|
resp = httpx.post(
|
|
f"{OPENBAO_ADDR}/v1/auth/approle/login",
|
|
json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["auth"]["client_token"]
|
|
|
|
|
|
def get_secret(bao_token: str, path: str, key: str) -> str:
|
|
"""Read a KV v2 secret from OpenBao."""
|
|
resp = httpx.get(
|
|
f"{OPENBAO_ADDR}/v1/secret/data/{path}",
|
|
headers={"X-Vault-Token": bao_token},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["data"]["data"][key]
|
|
|
|
|
|
# ── Vikunja ───────────────────────────────────────────────────────────────────
|
|
|
|
def get_vikunja_token(bao_token: str) -> str:
|
|
"""Fetch the dispatcher's Vikunja token from OpenBao."""
|
|
return get_secret(bao_token, "autojanet/pm/vikunja-token", "token")
|
|
|
|
|
|
def list_todo_tasks(vikunja_token: str) -> list[dict]:
|
|
"""Return all tasks in the Todo bucket of the AutoJanet project."""
|
|
tasks = []
|
|
page = 1
|
|
while True:
|
|
resp = httpx.get(
|
|
f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/views/{VIKUNJA_KANBAN_VIEW_ID}/buckets/{VIKUNJA_TODO_BUCKET_ID}/tasks",
|
|
headers={"Authorization": f"Bearer {vikunja_token}"},
|
|
params={"page": page, "per_page": 50},
|
|
timeout=15,
|
|
)
|
|
resp.raise_for_status()
|
|
batch = resp.json()
|
|
if not batch:
|
|
break
|
|
tasks.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return tasks
|
|
|
|
|
|
def extract_agent_role(task: dict) -> str | None:
|
|
"""
|
|
Return the role name if the task has exactly one `agent:<role>` label
|
|
that matches a known role. Returns None otherwise.
|
|
"""
|
|
labels = task.get("labels") or []
|
|
roles_found = []
|
|
for label in labels:
|
|
title = label.get("title", "")
|
|
m = re.match(r"^agent:(.+)$", title)
|
|
if m:
|
|
role = m.group(1)
|
|
if role in VALID_ROLES:
|
|
roles_found.append(role)
|
|
if len(roles_found) == 1:
|
|
return roles_found[0]
|
|
return None
|
|
|
|
|
|
def claim_task(vikunja_token: str, task_id: int) -> bool:
|
|
"""Move task to In Progress bucket. Returns True on success."""
|
|
resp = httpx.post(
|
|
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}",
|
|
headers={
|
|
"Authorization": f"Bearer {vikunja_token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={"bucket_id": VIKUNJA_IN_PROGRESS_BUCKET_ID},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code in (200, 201):
|
|
return True
|
|
log.warning("Failed to claim task %d: %d %s", task_id, resp.status_code, resp.text)
|
|
return False
|
|
|
|
|
|
# ── Kubernetes ────────────────────────────────────────────────────────────────
|
|
|
|
def load_k8s_config() -> None:
|
|
try:
|
|
k8s_config.load_incluster_config()
|
|
except k8s_config.ConfigException:
|
|
k8s_config.load_kube_config()
|
|
|
|
|
|
def job_name(role: str, task_id: int) -> str:
|
|
safe_role = role.replace("-", "")[:12]
|
|
return f"agent-{safe_role}-{task_id}"
|
|
|
|
|
|
def job_already_exists(batch_v1: k8s_client.BatchV1Api, name: str) -> bool:
|
|
try:
|
|
batch_v1.read_namespaced_job(name=name, namespace=K8S_NAMESPACE)
|
|
return True
|
|
except k8s_client.ApiException as e:
|
|
if e.status == 404:
|
|
return False
|
|
raise
|
|
|
|
|
|
def spawn_agent_job(
|
|
batch_v1: k8s_client.BatchV1Api,
|
|
role: str,
|
|
task_id: int,
|
|
task_title: str,
|
|
) -> None:
|
|
name = job_name(role, task_id)
|
|
if job_already_exists(batch_v1, name):
|
|
log.info("Job %s already exists, skipping", name)
|
|
return
|
|
|
|
job = k8s_client.V1Job(
|
|
api_version="batch/v1",
|
|
kind="Job",
|
|
metadata=k8s_client.V1ObjectMeta(
|
|
name=name,
|
|
namespace=K8S_NAMESPACE,
|
|
labels={
|
|
"autojanet/type": "agent",
|
|
"autojanet/role": role,
|
|
"autojanet/task-id": str(task_id),
|
|
},
|
|
),
|
|
spec=k8s_client.V1JobSpec(
|
|
ttl_seconds_after_finished=3600,
|
|
backoff_limit=1,
|
|
template=k8s_client.V1PodTemplateSpec(
|
|
metadata=k8s_client.V1ObjectMeta(
|
|
labels={
|
|
"autojanet/type": "agent",
|
|
"autojanet/role": role,
|
|
"autojanet/task-id": str(task_id),
|
|
}
|
|
),
|
|
spec=k8s_client.V1PodSpec(
|
|
service_account_name=f"agent-{role}",
|
|
restart_policy="Never",
|
|
node_selector={"kubernetes.io/arch": "amd64"},
|
|
containers=[
|
|
k8s_client.V1Container(
|
|
name="agent",
|
|
image=AGENT_IMAGE,
|
|
image_pull_policy="Always",
|
|
env=[
|
|
k8s_client.V1EnvVar(name="AGENT_ROLE", value=role),
|
|
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
|
|
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
|
|
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
|
|
k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"),
|
|
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL),
|
|
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"),
|
|
k8s_client.V1EnvVar(
|
|
name="OPENBAO_ROLE_ID",
|
|
value_from=k8s_client.V1EnvVarSource(
|
|
secret_key_ref=k8s_client.V1SecretKeySelector(
|
|
name=f"agent-{role}-approle",
|
|
key="role_id",
|
|
)
|
|
),
|
|
),
|
|
k8s_client.V1EnvVar(
|
|
name="OPENBAO_SECRET_ID",
|
|
value_from=k8s_client.V1EnvVarSource(
|
|
secret_key_ref=k8s_client.V1SecretKeySelector(
|
|
name=f"agent-{role}-approle",
|
|
key="secret_id",
|
|
)
|
|
),
|
|
),
|
|
],
|
|
resources=k8s_client.V1ResourceRequirements(
|
|
requests={"cpu": "250m", "memory": "512Mi"},
|
|
limits={"cpu": "2000m", "memory": "2Gi"},
|
|
),
|
|
security_context=k8s_client.V1SecurityContext(
|
|
allow_privilege_escalation=False,
|
|
run_as_non_root=True,
|
|
run_as_user=1000,
|
|
capabilities=k8s_client.V1Capabilities(drop=["ALL"]),
|
|
),
|
|
)
|
|
],
|
|
),
|
|
),
|
|
),
|
|
)
|
|
|
|
batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job)
|
|
log.info("Spawned job %s for role=%s task=%d", name, role, task_id)
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
log.info("Dispatcher starting")
|
|
|
|
# Auth
|
|
bao_token = get_openbao_token()
|
|
vikunja_token = get_vikunja_token(bao_token)
|
|
log.info("Authenticated to OpenBao and Vikunja")
|
|
|
|
# k8s
|
|
load_k8s_config()
|
|
batch_v1 = k8s_client.BatchV1Api()
|
|
|
|
# Scan tasks
|
|
tasks = list_todo_tasks(vikunja_token)
|
|
log.info("Found %d tasks in Todo bucket", len(tasks))
|
|
|
|
claimed = 0
|
|
for task in tasks:
|
|
task_id = task["id"]
|
|
title = task.get("title", "")
|
|
role = extract_agent_role(task)
|
|
|
|
if not role:
|
|
log.debug("Task %d has no valid agent label, skipping", task_id)
|
|
continue
|
|
|
|
log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role)
|
|
if not claim_task(vikunja_token, task_id):
|
|
continue
|
|
|
|
try:
|
|
spawn_agent_job(batch_v1, role, task_id, title)
|
|
claimed += 1
|
|
except Exception as e:
|
|
log.error("Failed to spawn job for task %d: %s", task_id, e)
|
|
# Un-claim: move back to Todo
|
|
httpx.post(
|
|
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}",
|
|
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
|
|
json={"bucket_id": VIKUNJA_TODO_BUCKET_ID},
|
|
timeout=10,
|
|
)
|
|
|
|
log.info("Dispatcher done. Claimed %d tasks.", claimed)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|