fix: bucket name is 'review' not 'in review'; add concurrency limits; intake service scaffold
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Zoë 2026-05-30 20:21:37 -07:00
parent c6ad5d008f
commit be03d042ad
3 changed files with 177 additions and 7 deletions

View file

@ -42,11 +42,15 @@ VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent
K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet")
AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest")
# Max concurrent jobs per role (and global across all roles)
MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2"))
MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10"))
# Standard bucket names (case-insensitive match)
BUCKET_BACKLOG = "backlog"
BUCKET_TODO = "todo"
BUCKET_IN_PROGRESS = "in progress"
BUCKET_IN_REVIEW = "in review"
BUCKET_IN_REVIEW = "review"
BUCKET_DONE = "done"
VALID_ROLES = {
@ -232,6 +236,26 @@ def load_k8s_config() -> None:
k8s_config.load_kube_config()
def count_active_jobs(batch_v1: k8s_client.BatchV1Api) -> tuple[int, dict[str, int]]:
"""Return (total_active, {role: count}) for all non-completed agent jobs."""
jobs = batch_v1.list_namespaced_job(
namespace=K8S_NAMESPACE,
label_selector="autojanet/type=agent",
_request_timeout=15,
)
total = 0
by_role: dict[str, int] = {}
for job in jobs.items:
# Skip completed/failed jobs
status = job.status
if (status.succeeded or 0) > 0 or (status.failed or 0) >= (job.spec.backoff_limit or 1) + 1:
continue
total += 1
role = job.metadata.labels.get("autojanet/role", "unknown")
by_role[role] = by_role.get(role, 0) + 1
return total, by_role
def job_name(role: str, task_id: int) -> str:
safe_role = role.replace("-", "")[:12]
return f"agent-{safe_role}-{task_id}"
@ -380,6 +404,10 @@ def main() -> None:
tasks = list_todo_tasks(vikunja_token, project_id, todo_id)
log.info("Found %d candidate tasks in Todo bucket", len(tasks))
# Check current job counts before spawning anything
total_active, active_by_role = count_active_jobs(batch_v1)
log.info("Active jobs: total=%d limits(per_role=%d, total=%d)", total_active, MAX_JOBS_PER_ROLE, MAX_JOBS_TOTAL)
claimed = 0
for task in tasks:
task_id = task["id"]
@ -390,6 +418,14 @@ def main() -> None:
log.debug("Task %d has no valid agent label, skipping", task_id)
continue
# Enforce concurrency limits
if total_active >= MAX_JOBS_TOTAL:
log.info("Global job limit reached (%d/%d), stopping", total_active, MAX_JOBS_TOTAL)
break
if active_by_role.get(role, 0) >= MAX_JOBS_PER_ROLE:
log.info("Role %s at limit (%d/%d), skipping task %d", role, active_by_role.get(role, 0), MAX_JOBS_PER_ROLE, task_id)
continue
log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role)
if not claim_task(vikunja_token, task_id, in_progress_id):
continue
@ -397,6 +433,8 @@ def main() -> None:
try:
spawn_agent_job(batch_v1, role, task_id, title, in_review_id, project_id, view_id)
claimed += 1
total_active += 1
active_by_role[role] = active_by_role.get(role, 0) + 1
except Exception as e:
log.error("Failed to spawn job for task %d: %s", task_id, e)
unclaim_task(vikunja_token, task_id, todo_id)

134
intake/main.py Normal file
View file

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
AutoJanet Intake Service
Accepts task submissions and creates Vikunja tasks with the appropriate
agent label so the dispatcher picks them up automatically.
POST /task
{
"title": "Add dark mode to the dashboard",
"description": "...", # optional
"role": "coder" # optional, defaults to "pm" (PM decomposes)
}
GET /health
"""
import logging
import os
import sys
import httpx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout)
log = logging.getLogger("intake")
VIKUNJA_BASE_URL = os.environ["VIKUNJA_BASE_URL"]
VIKUNJA_PM_TOKEN = os.environ["VIKUNJA_PM_TOKEN"]
VIKUNJA_PROJECT_ID = int(os.environ.get("VIKUNJA_PROJECT_ID", "78"))
VIKUNJA_TODO_BUCKET_ID = int(os.environ.get("VIKUNJA_TODO_BUCKET_ID", "116"))
VIKUNJA_VIEW_ID = int(os.environ.get("VIKUNJA_VIEW_ID", "114"))
# Label IDs for agent roles (from Vikunja)
ROLE_LABEL_IDS = {
"pm": 1,
"coder": 3,
"code-reviewer": 4,
"test-engineer": 5,
"devsecops": 6,
"secops": 7,
"sre": 8,
"kubernetes-pilot": 9,
"linux-admin": 10,
"systems-engineer": 11,
"networking": 12,
"dba": 13,
"prometheus-expert": 14,
"tofu-engineer": 15,
"release-manager": 16,
"doc-updater": 17,
"doc-writer": 18,
"technical-writer": 19,
"cost-optimizer": 20,
}
app = FastAPI(title="AutoJanet Intake", version="1.0.0")
VIKUNJA_HEADERS = {
"Authorization": f"Bearer {VIKUNJA_PM_TOKEN}",
"Content-Type": "application/json",
}
class TaskRequest(BaseModel):
title: str
description: str = ""
role: str = "pm"
class TaskResponse(BaseModel):
task_id: int
title: str
role: str
url: str
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/task", response_model=TaskResponse)
def submit_task(req: TaskRequest):
role = req.role.lower()
if role not in ROLE_LABEL_IDS:
raise HTTPException(status_code=400, detail=f"Unknown role '{role}'. Valid: {sorted(ROLE_LABEL_IDS)}")
label_id = ROLE_LABEL_IDS[role]
# Create the task
with httpx.Client(timeout=15) as client:
resp = client.put(
f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/tasks",
headers=VIKUNJA_HEADERS,
json={
"title": req.title,
"description": req.description,
"labels": [{"id": label_id}],
},
)
if resp.status_code not in (200, 201):
log.error("Vikunja task creation failed: %s %s", resp.status_code, resp.text)
raise HTTPException(status_code=502, detail="Failed to create Vikunja task")
task = resp.json()
task_id = task["id"]
log.info("Created task %d: %s (role=%s)", task_id, req.title, role)
# Move to Todo bucket
bucket_resp = client.post(
f"{VIKUNJA_BASE_URL}/api/v1/projects/{VIKUNJA_PROJECT_ID}/views/{VIKUNJA_VIEW_ID}/buckets/{VIKUNJA_TODO_BUCKET_ID}/tasks",
headers=VIKUNJA_HEADERS,
json={"task_id": task_id},
)
if bucket_resp.status_code not in (200, 201):
log.warning("Failed to move task %d to Todo bucket: %s", task_id, bucket_resp.text)
# Set the label explicitly (belt and suspenders)
label_resp = client.put(
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/labels",
headers=VIKUNJA_HEADERS,
json={"label_id": label_id},
)
if label_resp.status_code not in (200, 201):
log.warning("Failed to set label on task %d: %s", task_id, label_resp.text)
return TaskResponse(
task_id=task_id,
title=req.title,
role=role,
url=f"https://tasks.ctz.fyi/projects/{VIKUNJA_PROJECT_ID}/tasks/{task_id}",
)

View file

@ -42,16 +42,14 @@ spec:
key: secret_id
- name: VIKUNJA_BASE_URL
value: "http://vikunja.vikunja.svc.cluster.local:3456"
- name: VIKUNJA_PROJECT_ID
value: "78"
- name: VIKUNJA_TODO_BUCKET_ID
value: "116"
- name: VIKUNJA_IN_PROGRESS_BUCKET_ID
value: "117"
- name: K8S_NAMESPACE
value: "autojanet"
- name: AGENT_IMAGE
value: "registry.ctz.fyi/library/autojanet-agent:latest"
- name: MAX_JOBS_PER_ROLE
value: "2"
- name: MAX_JOBS_TOTAL
value: "10"
resources:
requests:
cpu: "100m"