feat: implement bucket transitions - dispatcher claims Todo→InProgress, agent moves to InReview on completion
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Zoë 2026-05-30 18:53:41 -07:00
parent 7bf51d5489
commit 35fd9c055c
2 changed files with 91 additions and 44 deletions

View file

@ -46,6 +46,7 @@ TASK_TITLE = os.environ.get("TASK_TITLE", f"Task {TASK_ID}")
LITELLM_BASE_URL = os.environ.get("LITELLM_BASE_URL", "https://llm.ctz.fyi") LITELLM_BASE_URL = os.environ.get("LITELLM_BASE_URL", "https://llm.ctz.fyi")
VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi") VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "https://tasks.ctz.fyi")
FORGEJO_BASE_URL = os.environ.get("FORGEJO_BASE_URL", "https://git.ctz.fyi") FORGEJO_BASE_URL = os.environ.get("FORGEJO_BASE_URL", "https://git.ctz.fyi")
IN_REVIEW_BUCKET_ID = os.environ.get("IN_REVIEW_BUCKET_ID", "")
HOME = Path(os.environ.get("HOME", "/home/agent")) HOME = Path(os.environ.get("HOME", "/home/agent"))
CONFIG_DIR = HOME / ".config" / "opencode" CONFIG_DIR = HOME / ".config" / "opencode"
@ -151,12 +152,12 @@ Your current task (Vikunja task #{task_id}):
{task_title} {task_title}
Instructions: Instructions:
1. Read the task carefully. 1. Read the task carefully. Fetch full task details from Vikunja if needed.
2. Fetch full task details from Vikunja if needed. 2. Complete the task using the tools available to you.
3. Complete the task using the tools available to you. 3. If you wrote code, open a PR on Forgejo do not merge it yourself.
4. Move the task to Done in Vikunja when complete. 4. Leave a comment on the Vikunja task summarising what you did and linking any PR.
5. Open a PR if code was written. 5. Do not mark the task as done the entrypoint will move it to In Review when you finish.
6. Do not ask for confirmation act autonomously within your constraints. 6. Do not ask for confirmation act autonomously within your role constraints.
""" """
@ -164,11 +165,28 @@ def run_opencode(prompt: str) -> int:
"""Run opencode non-interactively with the given prompt.""" """Run opencode non-interactively with the given prompt."""
cmd = ["opencode", "run", prompt] cmd = ["opencode", "run", prompt]
log.info("Running: %s", " ".join(cmd)) log.info("Running: %s", " ".join(cmd))
result = subprocess.run(cmd, check=False) result = subprocess.run(cmd, check=False)
return result.returncode return result.returncode
def move_task_to_in_review(vikunja_token: str) -> None:
"""Move the task bucket to In Review after work is complete."""
if not IN_REVIEW_BUCKET_ID:
log.warning("IN_REVIEW_BUCKET_ID not set, skipping bucket move")
return
try:
resp = httpx.post(
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{TASK_ID}",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json={"bucket_id": int(IN_REVIEW_BUCKET_ID)},
timeout=10,
)
resp.raise_for_status()
log.info("Moved task %s → In Review (bucket %s)", TASK_ID, IN_REVIEW_BUCKET_ID)
except Exception as e:
log.warning("Failed to move task to In Review: %s", e)
def main() -> None: def main() -> None:
log.info("Agent entrypoint: role=%s task=%s", AGENT_ROLE, TASK_ID) log.info("Agent entrypoint: role=%s task=%s", AGENT_ROLE, TASK_ID)
@ -178,9 +196,21 @@ def main() -> None:
secrets = fetch_role_secrets(bao_token, AGENT_ROLE) secrets = fetch_role_secrets(bao_token, AGENT_ROLE)
write_opencode_config(secrets, AGENT_ROLE) write_opencode_config(secrets, AGENT_ROLE)
# Fetch vikunja token for bucket moves
vikunja_token = ""
try:
vikunja_token = get_secret(bao_token, f"autojanet/{AGENT_ROLE}/vikunja-token", "token")
log.info("Fetched vikunja-token")
except Exception as e:
log.warning("Could not fetch vikunja-token: %s", e)
prompt = build_prompt(TASK_ID, TASK_TITLE) prompt = build_prompt(TASK_ID, TASK_TITLE)
rc = run_opencode(prompt) rc = run_opencode(prompt)
# Move to In Review regardless of exit code — work was attempted
if vikunja_token:
move_task_to_in_review(vikunja_token)
if rc != 0: if rc != 0:
log.error("opencode exited with code %d", rc) log.error("opencode exited with code %d", rc)
sys.exit(rc) sys.exit(rc)

View file

@ -136,8 +136,19 @@ def discover_buckets(vikunja_token: str, project_id: int, view_id: int) -> dict[
return mapping return mapping
def list_todo_tasks(vikunja_token: str, project_id: int) -> list[dict]: def vikunja_post(vikunja_token: str, path: str, body: dict) -> dict:
"""Return all undone tasks with agent labels from the project.""" resp = httpx.post(
f"{VIKUNJA_BASE_URL}/api/v1/{path}",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json=body,
timeout=15,
)
resp.raise_for_status()
return resp.json()
def list_todo_tasks(vikunja_token: str, project_id: int, todo_id: int) -> list[dict]:
"""Return all undone tasks in the Todo bucket with agent labels."""
tasks = [] tasks = []
page = 1 page = 1
while True: while True:
@ -148,30 +159,32 @@ def list_todo_tasks(vikunja_token: str, project_id: int) -> list[dict]:
if len(batch) < 50: if len(batch) < 50:
break break
page += 1 page += 1
return [t for t in tasks if not t.get("done") and t.get("labels")] return [
t for t in tasks
if not t.get("done")
and t.get("labels")
and t.get("bucket_id") == todo_id
]
def extract_agent_role(task: dict) -> str | None: def claim_task(vikunja_token: str, task_id: int, in_progress_id: int) -> bool:
labels = task.get("labels") or [] """Move task from Todo → In Progress."""
roles_found = [] try:
for label in labels: vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": in_progress_id})
title = label.get("title", "") log.info("Moved task %d → In Progress (bucket %d)", task_id, in_progress_id)
m = re.match(r"^agent:(.+)$", title) return True
if m: except Exception as e:
role = m.group(1) log.error("Failed to claim task %d: %s", task_id, e)
if role in VALID_ROLES: return False
roles_found.append(role)
return roles_found[0] if len(roles_found) == 1 else None
def claim_task(task_id: int) -> bool: def unclaim_task(vikunja_token: str, task_id: int, todo_id: int) -> None:
"""Placeholder — bucket moving deferred. Always returns True.""" """Move task back to Todo on job spawn failure."""
return True try:
vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": todo_id})
log.info("Unclaimed task %d → Todo", task_id)
def unclaim_task(task_id: int) -> None: except Exception as e:
"""Placeholder — bucket moving deferred.""" log.warning("Failed to unclaim task %d: %s", task_id, e)
pass
# ── Kubernetes ──────────────────────────────────────────────────────────────── # ── Kubernetes ────────────────────────────────────────────────────────────────
@ -203,6 +216,7 @@ def spawn_agent_job(
role: str, role: str,
task_id: int, task_id: int,
task_title: str, task_title: str,
in_review_bucket_id: int,
) -> None: ) -> None:
name = job_name(role, task_id) name = job_name(role, task_id)
if job_already_exists(batch_v1, name): if job_already_exists(batch_v1, name):
@ -242,13 +256,14 @@ def spawn_agent_job(
image=AGENT_IMAGE, image=AGENT_IMAGE,
image_pull_policy="Always", image_pull_policy="Always",
env=[ env=[
k8s_client.V1EnvVar(name="AGENT_ROLE", value=role), k8s_client.V1EnvVar(name="AGENT_ROLE", value=role),
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL",value=VIKUNJA_BASE_URL), k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL),
k8s_client.V1EnvVar(name="LITELLM_BASE_URL",value="https://llm.ctz.fyi"), k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"),
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL",value="https://git.ctz.fyi"), k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"),
k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)),
k8s_client.V1EnvVar( k8s_client.V1EnvVar(
name="OPENBAO_ROLE_ID", name="OPENBAO_ROLE_ID",
value_from=k8s_client.V1EnvVarSource( value_from=k8s_client.V1EnvVarSource(
@ -307,17 +322,19 @@ def main() -> None:
todo_id = buckets.get(BUCKET_TODO) todo_id = buckets.get(BUCKET_TODO)
in_progress_id = buckets.get(BUCKET_IN_PROGRESS) in_progress_id = buckets.get(BUCKET_IN_PROGRESS)
in_review_id = buckets.get(BUCKET_IN_REVIEW)
if not todo_id or not in_progress_id: if not all([todo_id, in_progress_id, in_review_id]):
log.warning("Could not find all standard buckets. Found: %s", list(buckets.keys())) log.error("Could not find all standard buckets. Found: %s", list(buckets.keys()))
sys.exit(1)
# k8s # k8s
load_k8s_config() load_k8s_config()
batch_v1 = k8s_client.BatchV1Api() batch_v1 = k8s_client.BatchV1Api()
# Scan + claim tasks # Scan Todo bucket for claimable tasks
tasks = list_todo_tasks(vikunja_token, project_id) tasks = list_todo_tasks(vikunja_token, project_id, todo_id)
log.info("Found %d candidate tasks", len(tasks)) log.info("Found %d candidate tasks in Todo bucket", len(tasks))
claimed = 0 claimed = 0
for task in tasks: for task in tasks:
@ -330,15 +347,15 @@ def main() -> None:
continue continue
log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role) log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role)
if not claim_task(task_id): if not claim_task(vikunja_token, task_id, in_progress_id):
continue continue
try: try:
spawn_agent_job(batch_v1, role, task_id, title) spawn_agent_job(batch_v1, role, task_id, title, in_review_id)
claimed += 1 claimed += 1
except Exception as e: except Exception as e:
log.error("Failed to spawn job for task %d: %s", task_id, e) log.error("Failed to spawn job for task %d: %s", task_id, e)
unclaim_task(task_id) unclaim_task(vikunja_token, task_id, todo_id)
log.info("Dispatcher done. Claimed %d tasks.", claimed) log.info("Dispatcher done. Claimed %d tasks.", claimed)