diff --git a/container/entrypoint.py b/container/entrypoint.py index 55931fb..727b4d5 100644 --- a/container/entrypoint.py +++ b/container/entrypoint.py @@ -164,6 +164,12 @@ Instructions: 5. Leave a comment on the Vikunja task summarising what you did and linking any PR. 6. Do not mark the task as done — the entrypoint will move it to In Review when you finish. 7. Do not ask for confirmation — act autonomously within your role constraints. + +IMPORTANT — avoid looping: +- If you attempt the same fix more than twice and it is still not working, STOP. +- Do not keep retrying the same approach hoping for a different result. +- Instead: leave a Vikunja comment explaining exactly what you tried and what is blocking you, then exit. +- A human will read it and unblock you. Spinning wastes time and money. """ diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 998096b..5551b15 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -3,9 +3,13 @@ AutoJanet Dispatcher Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by -name, resolves the kanban view and all 5 standard bucket IDs by name, then -claims tasks from the Todo bucket that have an `agent:` label and -spawns a Kubernetes Job for the appropriate agent. +name, resolves the kanban view and all 5 standard bucket IDs by name, then: + + 1. Sweeps done=True tasks into the Done bucket. + 2. Watchdog: moves stale InProgress tasks (dead job) back to Todo. + 3. Review orchestration: spawns a PM agent for each task in Review that + needs sub-tasks created (code-review, security-review, etc.). + 4. Claims tasks from the Todo bucket and spawns agent Jobs. Config (env vars): OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID @@ -45,6 +49,7 @@ AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojane # Max concurrent jobs per role (and global across all roles) MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2")) MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10")) +MAX_TASK_RETRIES = int(os.environ.get("MAX_TASK_RETRIES", "3")) # Standard bucket names (case-insensitive match) BUCKET_BACKLOG = "backlog" @@ -227,6 +232,276 @@ def sweep_done_tasks( log.info("Done sweep: moved %d tasks", moved) +# ── Role extraction ─────────────────────────────────────────────────────────── + +def extract_agent_role(task: dict) -> str | None: + """Return the agent role from an agent: label, or None.""" + for label in task.get("labels") or []: + title = label.get("title", "") + if title.startswith("agent:"): + role = title[len("agent:"):] + if role in VALID_ROLES: + return role + return None + + +# ── Stale watchdog ──────────────────────────────────────────────────────────── + +def count_jobs_for_task(batch_v1: k8s_client.BatchV1Api, task_id: int) -> tuple[int, int]: + """ + Return (active_count, total_ever) for all jobs with this task_id label. + active = job has no terminal outcome yet (still running or pending). + total_ever = ALL jobs ever spawned for this task, regardless of outcome. + Used to detect both dead-job stale and successful-but-looping scenarios. + """ + jobs = batch_v1.list_namespaced_job( + namespace=K8S_NAMESPACE, + label_selector=f"autojanet/task-id={task_id}", + _request_timeout=15, + ) + active = 0 + total_ever = len(jobs.items) + for job in jobs.items: + s = job.status + is_succeeded = (s.succeeded or 0) > 0 + is_failed = (s.failed or 0) >= (job.spec.backoff_limit or 1) + 1 + if not is_succeeded and not is_failed: + active += 1 + return active, total_ever + + +def post_stale_comment(vikunja_token: str, task_id: int, reason: str) -> None: + try: + httpx.put( + f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/comments", + headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"}, + json={"comment": reason}, + timeout=10, + ).raise_for_status() + except Exception as e: + log.warning("Failed to post stale comment on task %d: %s", task_id, e) + + +def watchdog_stale_tasks( + vikunja_token: str, + batch_v1: k8s_client.BatchV1Api, + project_id: int, + view_id: int, + in_progress_id: int, + todo_id: int, + backlog_id: int, +) -> None: + """ + For every task stuck in InProgress, check whether its Job is still alive. + - No job found or job terminally failed: + - attempt count < MAX_TASK_RETRIES → move back to Todo + - attempt count >= MAX_TASK_RETRIES → move to Backlog + comment + """ + page = 1 + tasks = [] + while True: + batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50) + if not batch: + break + tasks.extend(batch) + if len(batch) < 50: + break + page += 1 + + stale = [t for t in tasks if not t.get("done") and t.get("bucket_id") == in_progress_id] + log.info("Watchdog: checking %d InProgress tasks", len(stale)) + + for task in stale: + task_id = task["id"] + active, total_ever = count_jobs_for_task(batch_v1, task_id) + + if active > 0: + log.debug("Task %d has %d active job(s), leaving alone", task_id, active) + continue + + # No active job — task is stuck or looping + log.warning("Task %d is stale: active=%d total_attempts=%d", task_id, active, total_ever) + + if total_ever >= MAX_TASK_RETRIES: + log.error("Task %d hit retry limit (%d/%d), moving to Backlog", task_id, total_ever, MAX_TASK_RETRIES) + try: + vikunja_post( + vikunja_token, + f"projects/{project_id}/views/{view_id}/buckets/{backlog_id}/tasks", + {"task_id": task_id}, + ) + except Exception as e: + log.warning("Failed to move task %d to Backlog: %s", task_id, e) + post_stale_comment( + vikunja_token, task_id, + f"🚨 **Watchdog**: task has been attempted {total_ever} time(s) and hit the retry limit " + f"(`MAX_TASK_RETRIES={MAX_TASK_RETRIES}`). Moved to **Backlog** for manual review." + ) + else: + log.info("Task %d attempt %d/%d, requeueing to Todo", task_id, total_ever + 1, MAX_TASK_RETRIES) + try: + vikunja_post( + vikunja_token, + f"projects/{project_id}/views/{view_id}/buckets/{todo_id}/tasks", + {"task_id": task_id}, + ) + except Exception as e: + log.warning("Failed to requeue task %d to Todo: %s", task_id, e) + post_stale_comment( + vikunja_token, task_id, + f"⚠️ **Watchdog**: no active job found for this task (attempt {total_ever + 1}/{MAX_TASK_RETRIES}). " + f"Requeued to **Todo** for retry." + ) + + +# ── Review orchestration ────────────────────────────────────────────────────── + +# Roles that should NOT trigger review orchestration (would cause loops) +REVIEW_SKIP_ROLES = {"pm", "code-reviewer"} + + +def spawn_review_pm_job( + batch_v1: k8s_client.BatchV1Api, + task_id: int, + task_title: str, + in_review_bucket_id: int, + project_id: int, + view_id: int, +) -> None: + """Spawn a PM agent job to orchestrate review sub-tasks for a completed task.""" + name = f"review-pm-{task_id}" + if job_already_exists(batch_v1, name): + log.debug("Review PM job %s already exists, skipping", name) + return + + job = k8s_client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=k8s_client.V1ObjectMeta( + name=name, + namespace=K8S_NAMESPACE, + labels={ + "autojanet/type": "review-pm", + "autojanet/role": "pm", + "autojanet/task-id": str(task_id), + }, + ), + spec=k8s_client.V1JobSpec( + ttl_seconds_after_finished=3600, + backoff_limit=1, + template=k8s_client.V1PodTemplateSpec( + metadata=k8s_client.V1ObjectMeta( + labels={ + "autojanet/type": "review-pm", + "autojanet/role": "pm", + "autojanet/task-id": str(task_id), + } + ), + spec=k8s_client.V1PodSpec( + service_account_name="agent-pm", + restart_policy="Never", + node_selector={"kubernetes.io/arch": "amd64"}, + containers=[ + k8s_client.V1Container( + name="agent", + image=AGENT_IMAGE, + image_pull_policy="Always", + env=[ + k8s_client.V1EnvVar(name="AGENT_ROLE", value="pm"), + k8s_client.V1EnvVar(name="TASK_TYPE", value="review_orchestration"), + k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)), + k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title), + k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR), + k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL), + k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"), + k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"), + k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)), + k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)), + k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)), + k8s_client.V1EnvVar( + name="OPENBAO_ROLE_ID", + value_from=k8s_client.V1EnvVarSource( + secret_key_ref=k8s_client.V1SecretKeySelector( + name="agent-pm-approle", key="role_id", + ) + ), + ), + k8s_client.V1EnvVar( + name="OPENBAO_SECRET_ID", + value_from=k8s_client.V1EnvVarSource( + secret_key_ref=k8s_client.V1SecretKeySelector( + name="agent-pm-approle", key="secret_id", + ) + ), + ), + ], + resources=k8s_client.V1ResourceRequirements( + requests={"cpu": "250m", "memory": "512Mi"}, + limits={"cpu": "2000m", "memory": "2Gi"}, + ), + security_context=k8s_client.V1SecurityContext( + allow_privilege_escalation=False, + run_as_non_root=True, + run_as_user=1000, + capabilities=k8s_client.V1Capabilities(drop=["ALL"]), + ), + ) + ], + ), + ), + ), + ) + + log.info("Spawning review PM job %s for task %d", name, task_id) + batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30) + + +def orchestrate_review_tasks( + vikunja_token: str, + batch_v1: k8s_client.BatchV1Api, + project_id: int, + view_id: int, + in_review_id: int, +) -> None: + """ + Scan the Review bucket. For each task that has a non-pm/non-reviewer agent + label and no review-pm job yet, spawn a PM agent to create review sub-tasks. + """ + page = 1 + tasks = [] + while True: + batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50) + if not batch: + break + tasks.extend(batch) + if len(batch) < 50: + break + page += 1 + + review_tasks = [ + t for t in tasks + if not t.get("done") and t.get("bucket_id") == in_review_id + ] + log.info("Review orchestration: checking %d tasks in Review bucket", len(review_tasks)) + + for task in review_tasks: + task_id = task["id"] + role = extract_agent_role(task) + + if not role or role in REVIEW_SKIP_ROLES: + log.debug("Task %d role=%s skipped for review orchestration", task_id, role) + continue + + spawn_review_pm_job( + batch_v1, + task_id=task_id, + task_title=task.get("title", f"Task {task_id}"), + in_review_bucket_id=in_review_id, + project_id=project_id, + view_id=view_id, + ) + + # ── Kubernetes ──────────────────────────────────────────────────────────────── def load_k8s_config() -> None: @@ -388,8 +663,9 @@ def main() -> None: in_progress_id = buckets.get(BUCKET_IN_PROGRESS) in_review_id = buckets.get(BUCKET_IN_REVIEW) done_id = buckets.get(BUCKET_DONE) + backlog_id = buckets.get(BUCKET_BACKLOG) - if not all([todo_id, in_progress_id, in_review_id, done_id]): + if not all([todo_id, in_progress_id, in_review_id, done_id, backlog_id]): log.error("Could not find all standard buckets. Found: %s", list(buckets.keys())) sys.exit(1) @@ -397,10 +673,20 @@ def main() -> None: load_k8s_config() batch_v1 = k8s_client.BatchV1Api() - # Sweep: move any done=True tasks into the Done bucket + # 1. Sweep: move any done=True tasks into the Done bucket sweep_done_tasks(vikunja_token, project_id, view_id, done_id) - # Scan Todo bucket for claimable tasks + # 2. Watchdog: requeue or escalate stale InProgress tasks + watchdog_stale_tasks( + vikunja_token, batch_v1, + project_id, view_id, + in_progress_id, todo_id, backlog_id, + ) + + # 3. Review orchestration: spawn PM jobs for tasks awaiting review + orchestrate_review_tasks(vikunja_token, batch_v1, project_id, view_id, in_review_id) + + # 4. Scan Todo bucket for claimable tasks tasks = list_todo_tasks(vikunja_token, project_id, todo_id) log.info("Found %d candidate tasks in Todo bucket", len(tasks))