feat: stale watchdog, review orchestration, loop-break prompt
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Zoë 2026-05-30 20:55:52 -07:00
parent 7b5eb15292
commit fbc5e33292
2 changed files with 298 additions and 6 deletions

View file

@ -164,6 +164,12 @@ Instructions:
5. Leave a comment on the Vikunja task summarising what you did and linking any PR.
6. Do not mark the task as done the entrypoint will move it to In Review when you finish.
7. Do not ask for confirmation act autonomously within your role constraints.
IMPORTANT avoid looping:
- If you attempt the same fix more than twice and it is still not working, STOP.
- Do not keep retrying the same approach hoping for a different result.
- Instead: leave a Vikunja comment explaining exactly what you tried and what is blocking you, then exit.
- A human will read it and unblock you. Spinning wastes time and money.
"""

View file

@ -3,9 +3,13 @@
AutoJanet Dispatcher
Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by
name, resolves the kanban view and all 5 standard bucket IDs by name, then
claims tasks from the Todo bucket that have an `agent:<role>` label and
spawns a Kubernetes Job for the appropriate agent.
name, resolves the kanban view and all 5 standard bucket IDs by name, then:
1. Sweeps done=True tasks into the Done bucket.
2. Watchdog: moves stale InProgress tasks (dead job) back to Todo.
3. Review orchestration: spawns a PM agent for each task in Review that
needs sub-tasks created (code-review, security-review, etc.).
4. Claims tasks from the Todo bucket and spawns agent Jobs.
Config (env vars):
OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID
@ -45,6 +49,7 @@ AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojane
# Max concurrent jobs per role (and global across all roles)
MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2"))
MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10"))
MAX_TASK_RETRIES = int(os.environ.get("MAX_TASK_RETRIES", "3"))
# Standard bucket names (case-insensitive match)
BUCKET_BACKLOG = "backlog"
@ -227,6 +232,276 @@ def sweep_done_tasks(
log.info("Done sweep: moved %d tasks", moved)
# ── Role extraction ───────────────────────────────────────────────────────────
def extract_agent_role(task: dict) -> str | None:
"""Return the agent role from an agent:<role> label, or None."""
for label in task.get("labels") or []:
title = label.get("title", "")
if title.startswith("agent:"):
role = title[len("agent:"):]
if role in VALID_ROLES:
return role
return None
# ── Stale watchdog ────────────────────────────────────────────────────────────
def count_jobs_for_task(batch_v1: k8s_client.BatchV1Api, task_id: int) -> tuple[int, int]:
"""
Return (active_count, total_ever) for all jobs with this task_id label.
active = job has no terminal outcome yet (still running or pending).
total_ever = ALL jobs ever spawned for this task, regardless of outcome.
Used to detect both dead-job stale and successful-but-looping scenarios.
"""
jobs = batch_v1.list_namespaced_job(
namespace=K8S_NAMESPACE,
label_selector=f"autojanet/task-id={task_id}",
_request_timeout=15,
)
active = 0
total_ever = len(jobs.items)
for job in jobs.items:
s = job.status
is_succeeded = (s.succeeded or 0) > 0
is_failed = (s.failed or 0) >= (job.spec.backoff_limit or 1) + 1
if not is_succeeded and not is_failed:
active += 1
return active, total_ever
def post_stale_comment(vikunja_token: str, task_id: int, reason: str) -> None:
try:
httpx.put(
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/comments",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json={"comment": reason},
timeout=10,
).raise_for_status()
except Exception as e:
log.warning("Failed to post stale comment on task %d: %s", task_id, e)
def watchdog_stale_tasks(
vikunja_token: str,
batch_v1: k8s_client.BatchV1Api,
project_id: int,
view_id: int,
in_progress_id: int,
todo_id: int,
backlog_id: int,
) -> None:
"""
For every task stuck in InProgress, check whether its Job is still alive.
- No job found or job terminally failed:
- attempt count < MAX_TASK_RETRIES move back to Todo
- attempt count >= MAX_TASK_RETRIES move to Backlog + comment
"""
page = 1
tasks = []
while True:
batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50)
if not batch:
break
tasks.extend(batch)
if len(batch) < 50:
break
page += 1
stale = [t for t in tasks if not t.get("done") and t.get("bucket_id") == in_progress_id]
log.info("Watchdog: checking %d InProgress tasks", len(stale))
for task in stale:
task_id = task["id"]
active, total_ever = count_jobs_for_task(batch_v1, task_id)
if active > 0:
log.debug("Task %d has %d active job(s), leaving alone", task_id, active)
continue
# No active job — task is stuck or looping
log.warning("Task %d is stale: active=%d total_attempts=%d", task_id, active, total_ever)
if total_ever >= MAX_TASK_RETRIES:
log.error("Task %d hit retry limit (%d/%d), moving to Backlog", task_id, total_ever, MAX_TASK_RETRIES)
try:
vikunja_post(
vikunja_token,
f"projects/{project_id}/views/{view_id}/buckets/{backlog_id}/tasks",
{"task_id": task_id},
)
except Exception as e:
log.warning("Failed to move task %d to Backlog: %s", task_id, e)
post_stale_comment(
vikunja_token, task_id,
f"🚨 **Watchdog**: task has been attempted {total_ever} time(s) and hit the retry limit "
f"(`MAX_TASK_RETRIES={MAX_TASK_RETRIES}`). Moved to **Backlog** for manual review."
)
else:
log.info("Task %d attempt %d/%d, requeueing to Todo", task_id, total_ever + 1, MAX_TASK_RETRIES)
try:
vikunja_post(
vikunja_token,
f"projects/{project_id}/views/{view_id}/buckets/{todo_id}/tasks",
{"task_id": task_id},
)
except Exception as e:
log.warning("Failed to requeue task %d to Todo: %s", task_id, e)
post_stale_comment(
vikunja_token, task_id,
f"⚠️ **Watchdog**: no active job found for this task (attempt {total_ever + 1}/{MAX_TASK_RETRIES}). "
f"Requeued to **Todo** for retry."
)
# ── Review orchestration ──────────────────────────────────────────────────────
# Roles that should NOT trigger review orchestration (would cause loops)
REVIEW_SKIP_ROLES = {"pm", "code-reviewer"}
def spawn_review_pm_job(
batch_v1: k8s_client.BatchV1Api,
task_id: int,
task_title: str,
in_review_bucket_id: int,
project_id: int,
view_id: int,
) -> None:
"""Spawn a PM agent job to orchestrate review sub-tasks for a completed task."""
name = f"review-pm-{task_id}"
if job_already_exists(batch_v1, name):
log.debug("Review PM job %s already exists, skipping", name)
return
job = k8s_client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=K8S_NAMESPACE,
labels={
"autojanet/type": "review-pm",
"autojanet/role": "pm",
"autojanet/task-id": str(task_id),
},
),
spec=k8s_client.V1JobSpec(
ttl_seconds_after_finished=3600,
backoff_limit=1,
template=k8s_client.V1PodTemplateSpec(
metadata=k8s_client.V1ObjectMeta(
labels={
"autojanet/type": "review-pm",
"autojanet/role": "pm",
"autojanet/task-id": str(task_id),
}
),
spec=k8s_client.V1PodSpec(
service_account_name="agent-pm",
restart_policy="Never",
node_selector={"kubernetes.io/arch": "amd64"},
containers=[
k8s_client.V1Container(
name="agent",
image=AGENT_IMAGE,
image_pull_policy="Always",
env=[
k8s_client.V1EnvVar(name="AGENT_ROLE", value="pm"),
k8s_client.V1EnvVar(name="TASK_TYPE", value="review_orchestration"),
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL),
k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"),
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"),
k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)),
k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)),
k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)),
k8s_client.V1EnvVar(
name="OPENBAO_ROLE_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name="agent-pm-approle", key="role_id",
)
),
),
k8s_client.V1EnvVar(
name="OPENBAO_SECRET_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name="agent-pm-approle", key="secret_id",
)
),
),
],
resources=k8s_client.V1ResourceRequirements(
requests={"cpu": "250m", "memory": "512Mi"},
limits={"cpu": "2000m", "memory": "2Gi"},
),
security_context=k8s_client.V1SecurityContext(
allow_privilege_escalation=False,
run_as_non_root=True,
run_as_user=1000,
capabilities=k8s_client.V1Capabilities(drop=["ALL"]),
),
)
],
),
),
),
)
log.info("Spawning review PM job %s for task %d", name, task_id)
batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30)
def orchestrate_review_tasks(
vikunja_token: str,
batch_v1: k8s_client.BatchV1Api,
project_id: int,
view_id: int,
in_review_id: int,
) -> None:
"""
Scan the Review bucket. For each task that has a non-pm/non-reviewer agent
label and no review-pm job yet, spawn a PM agent to create review sub-tasks.
"""
page = 1
tasks = []
while True:
batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50)
if not batch:
break
tasks.extend(batch)
if len(batch) < 50:
break
page += 1
review_tasks = [
t for t in tasks
if not t.get("done") and t.get("bucket_id") == in_review_id
]
log.info("Review orchestration: checking %d tasks in Review bucket", len(review_tasks))
for task in review_tasks:
task_id = task["id"]
role = extract_agent_role(task)
if not role or role in REVIEW_SKIP_ROLES:
log.debug("Task %d role=%s skipped for review orchestration", task_id, role)
continue
spawn_review_pm_job(
batch_v1,
task_id=task_id,
task_title=task.get("title", f"Task {task_id}"),
in_review_bucket_id=in_review_id,
project_id=project_id,
view_id=view_id,
)
# ── Kubernetes ────────────────────────────────────────────────────────────────
def load_k8s_config() -> None:
@ -388,8 +663,9 @@ def main() -> None:
in_progress_id = buckets.get(BUCKET_IN_PROGRESS)
in_review_id = buckets.get(BUCKET_IN_REVIEW)
done_id = buckets.get(BUCKET_DONE)
backlog_id = buckets.get(BUCKET_BACKLOG)
if not all([todo_id, in_progress_id, in_review_id, done_id]):
if not all([todo_id, in_progress_id, in_review_id, done_id, backlog_id]):
log.error("Could not find all standard buckets. Found: %s", list(buckets.keys()))
sys.exit(1)
@ -397,10 +673,20 @@ def main() -> None:
load_k8s_config()
batch_v1 = k8s_client.BatchV1Api()
# Sweep: move any done=True tasks into the Done bucket
# 1. Sweep: move any done=True tasks into the Done bucket
sweep_done_tasks(vikunja_token, project_id, view_id, done_id)
# Scan Todo bucket for claimable tasks
# 2. Watchdog: requeue or escalate stale InProgress tasks
watchdog_stale_tasks(
vikunja_token, batch_v1,
project_id, view_id,
in_progress_id, todo_id, backlog_id,
)
# 3. Review orchestration: spawn PM jobs for tasks awaiting review
orchestrate_review_tasks(vikunja_token, batch_v1, project_id, view_id, in_review_id)
# 4. Scan Todo bucket for claimable tasks
tasks = list_todo_tasks(vikunja_token, project_id, todo_id)
log.info("Found %d candidate tasks in Todo bucket", len(tasks))