diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index e356444..998096b 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -42,11 +42,15 @@ VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet") AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest") +# Max concurrent jobs per role (and global across all roles) +MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2")) +MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10")) + # Standard bucket names (case-insensitive match) BUCKET_BACKLOG = "backlog" BUCKET_TODO = "todo" BUCKET_IN_PROGRESS = "in progress" -BUCKET_IN_REVIEW = "in review" +BUCKET_IN_REVIEW = "review" BUCKET_DONE = "done" VALID_ROLES = { @@ -232,6 +236,26 @@ def load_k8s_config() -> None: k8s_config.load_kube_config() +def count_active_jobs(batch_v1: k8s_client.BatchV1Api) -> tuple[int, dict[str, int]]: + """Return (total_active, {role: count}) for all non-completed agent jobs.""" + jobs = batch_v1.list_namespaced_job( + namespace=K8S_NAMESPACE, + label_selector="autojanet/type=agent", + _request_timeout=15, + ) + total = 0 + by_role: dict[str, int] = {} + for job in jobs.items: + # Skip completed/failed jobs + status = job.status + if (status.succeeded or 0) > 0 or (status.failed or 0) >= (job.spec.backoff_limit or 1) + 1: + continue + total += 1 + role = job.metadata.labels.get("autojanet/role", "unknown") + by_role[role] = by_role.get(role, 0) + 1 + return total, by_role + + def job_name(role: str, task_id: int) -> str: safe_role = role.replace("-", "")[:12] return f"agent-{safe_role}-{task_id}" @@ -380,6 +404,10 @@ def main() -> None: tasks = list_todo_tasks(vikunja_token, project_id, todo_id) log.info("Found %d candidate tasks in Todo bucket", len(tasks)) + # Check current job counts before spawning anything + total_active, active_by_role = count_active_jobs(batch_v1) + log.info("Active jobs: total=%d limits(per_role=%d, total=%d)", total_active, MAX_JOBS_PER_ROLE, MAX_JOBS_TOTAL) + claimed = 0 for task in tasks: task_id = task["id"] @@ -390,6 +418,14 @@ def main() -> None: log.debug("Task %d has no valid agent label, skipping", task_id) continue + # Enforce concurrency limits + if total_active >= MAX_JOBS_TOTAL: + log.info("Global job limit reached (%d/%d), stopping", total_active, MAX_JOBS_TOTAL) + break + if active_by_role.get(role, 0) >= MAX_JOBS_PER_ROLE: + log.info("Role %s at limit (%d/%d), skipping task %d", role, active_by_role.get(role, 0), MAX_JOBS_PER_ROLE, task_id) + continue + log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) if not claim_task(vikunja_token, task_id, in_progress_id): continue @@ -397,6 +433,8 @@ def main() -> None: try: spawn_agent_job(batch_v1, role, task_id, title, in_review_id, project_id, view_id) claimed += 1 + total_active += 1 + active_by_role[role] = active_by_role.get(role, 0) + 1 except Exception as e: log.error("Failed to spawn job for task %d: %s", task_id, e) unclaim_task(vikunja_token, task_id, todo_id) diff --git a/intake/main.py b/intake/main.py new file mode 100644 index 0000000..a97f343 --- /dev/null +++ b/intake/main.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +AutoJanet Intake Service + +Accepts task submissions and creates Vikunja tasks with the appropriate +agent label so the dispatcher picks them up automatically. + +POST /task + { + "title": "Add dark mode to the dashboard", + "description": "...", # optional + "role": "coder" # optional, defaults to "pm" (PM decomposes) + } + +GET /health +""" + +import logging +import os +import sys + +import httpx +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout) +log = logging.getLogger("intake") + +VIKUNJA_BASE_URL = os.environ["VIKUNJA_BASE_URL"] +VIKUNJA_PM_TOKEN = os.environ["VIKUNJA_PM_TOKEN"] +VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78")) +VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116")) +VIKUNJA_VIEW_ID = int(os.environ.get("VIKUNJA_VIEW_ID", "114")) + +# Label IDs for agent roles (from Vikunja) +ROLE_LABEL_IDS = { + "pm": 1, + "coder": 3, + "code-reviewer": 4, + "test-engineer": 5, + "devsecops": 6, + "secops": 7, + "sre": 8, + "kubernetes-pilot": 9, + "linux-admin": 10, + "systems-engineer": 11, + "networking": 12, + "dba": 13, + "prometheus-expert": 14, + "tofu-engineer": 15, + "release-manager": 16, + "doc-updater": 17, + "doc-writer": 18, + "technical-writer": 19, + "cost-optimizer": 20, +} + +app = FastAPI(title="AutoJanet Intake", version="1.0.0") + +VIKUNJA_HEADERS = { + "Authorization": f"Bearer {VIKUNJA_PM_TOKEN}", + "Content-Type": "application/json", +} + + +class TaskRequest(BaseModel): + title: str + description: str = "" + role: str = "pm" + + +class TaskResponse(BaseModel): + task_id: int + title: str + role: str + url: str + + +@app.get("/health") +def health(): + return {"status": "ok"} + + +@app.post("/task", response_model=TaskResponse) +def submit_task(req: TaskRequest): + role = req.role.lower() + if role not in ROLE_LABEL_IDS: + raise HTTPException(status_code=400, detail=f"Unknown role '{role}'. Valid: {sorted(ROLE_LABEL_IDS)}") + + label_id = ROLE_LABEL_IDS[role] + + # Create the task + with httpx.Client(timeout=15) as client: + resp = client.put( + f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/tasks", + headers=VIKUNJA_HEADERS, + json={ + "title": req.title, + "description": req.description, + "labels": [{"id": label_id}], + }, + ) + if resp.status_code not in (200, 201): + log.error("Vikunja task creation failed: %s %s", resp.status_code, resp.text) + raise HTTPException(status_code=502, detail="Failed to create Vikunja task") + + task = resp.json() + task_id = task["id"] + log.info("Created task %d: %s (role=%s)", task_id, req.title, role) + + # Move to Todo bucket + bucket_resp = client.post( + f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/views/{VIKUNJA_VIEW_ID}/buckets/{VIKUNJA_TODO_BUCKET_ID}/tasks", + headers=VIKUNJA_HEADERS, + json={"task_id": task_id}, + ) + if bucket_resp.status_code not in (200, 201): + log.warning("Failed to move task %d to Todo bucket: %s", task_id, bucket_resp.text) + + # Set the label explicitly (belt and suspenders) + label_resp = client.put( + f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/labels", + headers=VIKUNJA_HEADERS, + json={"label_id": label_id}, + ) + if label_resp.status_code not in (200, 201): + log.warning("Failed to set label on task %d: %s", task_id, label_resp.text) + + return TaskResponse( + task_id=task_id, + title=req.title, + role=role, + url=f"https://tasks.ctz.fyi/projects/{VIKUNJA_PROJECT_ID}/tasks/{task_id}", + ) diff --git a/k8s/manifests/dispatcher-cronjob.yaml b/k8s/manifests/dispatcher-cronjob.yaml index 97e79ba..f8bfb65 100644 --- a/k8s/manifests/dispatcher-cronjob.yaml +++ b/k8s/manifests/dispatcher-cronjob.yaml @@ -42,16 +42,14 @@ spec: key: secret_id - name: VIKUNJA_BASE_URL value: "http://vikunja.vikunja.svc.cluster.local:3456" - - name: VIKUNJA_PROJECT_ID - value: "78" - - name: VIKUNJA_TODO_BUCKET_ID - value: "116" - - name: VIKUNJA_IN_PROGRESS_BUCKET_ID - value: "117" - name: K8S_NAMESPACE value: "autojanet" - name: AGENT_IMAGE value: "registry.ctz.fyi/library/autojanet-agent:latest" + - name: MAX_JOBS_PER_ROLE + value: "2" + - name: MAX_JOBS_TOTAL + value: "10" resources: requests: cpu: "100m"