autojanet/dispatcher/dispatcher.py
Zoë b0fb10706e
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
fix: use filter API for bucket scanning, bucket_id is view-local not on task object
2026-05-30 21:01:34 -07:00

716 lines
28 KiB
Python

#!/usr/bin/env python3
"""
AutoJanet Dispatcher
Runs as a CronJob every 2 minutes. Discovers the target Vikunja project by
name, resolves the kanban view and all 5 standard bucket IDs by name, then:
1. Sweeps done=True tasks into the Done bucket.
2. Watchdog: moves stale InProgress tasks (dead job) back to Todo.
3. Review orchestration: spawns a PM agent for each task in Review that
needs sub-tasks created (code-review, security-review, etc.).
4. Claims tasks from the Todo bucket and spawns agent Jobs.
Config (env vars):
OPENBAO_ADDR, OPENBAO_ROLE_ID, OPENBAO_SECRET_ID
VIKUNJA_BASE_URL (default: http://vikunja.vikunja.svc.cluster.local:3456)
VIKUNJA_PROJECT_NAME (default: Autonomous Agent Platform)
K8S_NAMESPACE (default: autojanet)
AGENT_IMAGE
"""
import logging
import os
import re
import sys
import httpx
from kubernetes import client as k8s_client, config as k8s_config
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("dispatcher")
# ── Config ────────────────────────────────────────────────────────────────────
OPENBAO_ADDR = os.environ["OPENBAO_ADDR"]
OPENBAO_ROLE_ID = os.environ["OPENBAO_ROLE_ID"]
OPENBAO_SECRET_ID = os.environ["OPENBAO_SECRET_ID"]
VIKUNJA_BASE_URL = os.environ.get("VIKUNJA_BASE_URL", "http://vikunja.vikunja.svc.cluster.local:3456")
VIKUNJA_PROJECT_NAME = os.environ.get("VIKUNJA_PROJECT_NAME", "Autonomous Agent Platform")
K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "autojanet")
AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "registry.ctz.fyi/library/autojanet-agent:latest")
# Max concurrent jobs per role (and global across all roles)
MAX_JOBS_PER_ROLE = int(os.environ.get("MAX_JOBS_PER_ROLE", "2"))
MAX_JOBS_TOTAL = int(os.environ.get("MAX_JOBS_TOTAL", "10"))
MAX_TASK_RETRIES = int(os.environ.get("MAX_TASK_RETRIES", "3"))
# Standard bucket names (case-insensitive match)
BUCKET_BACKLOG = "backlog"
BUCKET_TODO = "todo"
BUCKET_IN_PROGRESS = "in progress"
BUCKET_IN_REVIEW = "review"
BUCKET_DONE = "done"
VALID_ROLES = {
"pm", "coder", "code-reviewer", "test-engineer", "devsecops", "secops",
"sre", "kubernetes-pilot", "linux-admin", "systems-engineer", "networking",
"dba", "prometheus-expert", "tofu-engineer", "release-manager",
"doc-updater", "doc-writer", "technical-writer", "cost-optimizer",
}
# ── OpenBao ───────────────────────────────────────────────────────────────────
def get_openbao_token() -> str:
resp = httpx.post(
f"{OPENBAO_ADDR}/v1/auth/approle/login",
json={"role_id": OPENBAO_ROLE_ID, "secret_id": OPENBAO_SECRET_ID},
timeout=10,
)
resp.raise_for_status()
return resp.json()["auth"]["client_token"]
def get_secret(bao_token: str, path: str, key: str) -> str:
resp = httpx.get(
f"{OPENBAO_ADDR}/v1/secret/data/{path}",
headers={"X-Vault-Token": bao_token},
timeout=10,
)
resp.raise_for_status()
return resp.json()["data"]["data"][key]
def get_vikunja_token(bao_token: str) -> str:
return get_secret(bao_token, "autojanet/pm/vikunja-token", "token")
# ── Vikunja discovery ─────────────────────────────────────────────────────────
def vikunja_get(vikunja_token: str, path: str, **params) -> list | dict:
resp = httpx.get(
f"{VIKUNJA_BASE_URL}/api/v1/{path}",
headers={"Authorization": f"Bearer {vikunja_token}"},
params=params or None,
timeout=15,
)
resp.raise_for_status()
return resp.json()
def find_project_id(vikunja_token: str, project_name: str) -> int:
"""Find project ID by exact name match."""
page = 1
while True:
projects = vikunja_get(vikunja_token, "projects", page=page, per_page=50)
if not projects:
break
for p in projects:
if p.get("title", "").strip().lower() == project_name.strip().lower():
log.info("Found project '%s' id=%d", project_name, p["id"])
return p["id"]
if len(projects) < 50:
break
page += 1
raise RuntimeError(f"Project '{project_name}' not found in Vikunja")
def find_kanban_view_id(vikunja_token: str, project_id: int) -> int:
"""Find the kanban view for a project."""
views = vikunja_get(vikunja_token, f"projects/{project_id}/views")
for v in views:
if v.get("view_kind") == "kanban":
log.info("Found kanban view id=%d", v["id"])
return v["id"]
raise RuntimeError(f"No kanban view found for project {project_id}")
def discover_buckets(vikunja_token: str, project_id: int, view_id: int) -> dict[str, int]:
"""
Return a dict mapping normalised bucket name -> bucket id.
Standard names: backlog, todo, in progress, in review, done
"""
buckets = vikunja_get(vikunja_token, f"projects/{project_id}/views/{view_id}/buckets")
mapping = {}
for b in buckets:
name = b.get("title", "").strip().lower()
mapping[name] = b["id"]
log.info("Bucket '%s' id=%d", b["title"], b["id"])
return mapping
def vikunja_post(vikunja_token: str, path: str, body: dict) -> dict:
resp = httpx.post(
f"{VIKUNJA_BASE_URL}/api/v1/{path}",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json=body,
timeout=15,
)
resp.raise_for_status()
return resp.json()
def list_tasks_in_bucket(vikunja_token: str, project_id: int, bucket_id: int) -> list[dict]:
"""Return all undone tasks in a specific bucket using the filter API."""
tasks = []
page = 1
while True:
batch = vikunja_get(
vikunja_token,
f"projects/{project_id}/tasks",
page=page,
per_page=50,
filter=f"bucket_id = {bucket_id}",
)
if not batch:
break
tasks.extend(batch)
if len(batch) < 50:
break
page += 1
return [t for t in tasks if not t.get("done")]
def list_todo_tasks(vikunja_token: str, project_id: int, todo_id: int) -> list[dict]:
"""Return all undone tasks in the Todo bucket that have agent labels."""
return [
t for t in list_tasks_in_bucket(vikunja_token, project_id, todo_id)
if t.get("labels")
]
def claim_task(vikunja_token: str, task_id: int, in_progress_id: int) -> bool:
"""Move task from Todo → In Progress."""
try:
vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": in_progress_id})
log.info("Moved task %d → In Progress (bucket %d)", task_id, in_progress_id)
return True
except Exception as e:
log.error("Failed to claim task %d: %s", task_id, e)
return False
def unclaim_task(vikunja_token: str, task_id: int, todo_id: int) -> None:
"""Move task back to Todo on job spawn failure."""
try:
vikunja_post(vikunja_token, f"tasks/{task_id}", {"bucket_id": todo_id})
log.info("Unclaimed task %d → Todo", task_id)
except Exception as e:
log.warning("Failed to unclaim task %d: %s", task_id, e)
def sweep_done_tasks(
vikunja_token: str,
project_id: int,
view_id: int,
done_bucket_id: int,
) -> None:
"""Find tasks marked done=True that aren't in the Done bucket and move them."""
tasks = []
page = 1
while True:
batch = vikunja_get(vikunja_token, f"projects/{project_id}/tasks", page=page, per_page=50)
if not batch:
break
tasks.extend(batch)
if len(batch) < 50:
break
page += 1
moved = 0
for task in tasks:
if task.get("done"):
task_id = task["id"]
try:
vikunja_post(
vikunja_token,
f"projects/{project_id}/views/{view_id}/buckets/{done_bucket_id}/tasks",
{"task_id": task_id},
)
log.info("Swept done task %d → Done bucket", task_id)
moved += 1
except Exception as e:
log.warning("Failed to sweep task %d to Done: %s", task_id, e)
log.info("Done sweep: moved %d tasks", moved)
# ── Role extraction ───────────────────────────────────────────────────────────
def extract_agent_role(task: dict) -> str | None:
"""Return the agent role from an agent:<role> label, or None."""
for label in task.get("labels") or []:
title = label.get("title", "")
if title.startswith("agent:"):
role = title[len("agent:"):]
if role in VALID_ROLES:
return role
return None
# ── Stale watchdog ────────────────────────────────────────────────────────────
def count_jobs_for_task(batch_v1: k8s_client.BatchV1Api, task_id: int) -> tuple[int, int]:
"""
Return (active_count, total_ever) for all jobs with this task_id label.
active = job has no terminal outcome yet (still running or pending).
total_ever = ALL jobs ever spawned for this task, regardless of outcome.
Used to detect both dead-job stale and successful-but-looping scenarios.
"""
jobs = batch_v1.list_namespaced_job(
namespace=K8S_NAMESPACE,
label_selector=f"autojanet/task-id={task_id}",
_request_timeout=15,
)
active = 0
total_ever = len(jobs.items)
for job in jobs.items:
s = job.status
is_succeeded = (s.succeeded or 0) > 0
is_failed = (s.failed or 0) >= (job.spec.backoff_limit or 1) + 1
if not is_succeeded and not is_failed:
active += 1
return active, total_ever
def post_stale_comment(vikunja_token: str, task_id: int, reason: str) -> None:
try:
httpx.put(
f"{VIKUNJA_BASE_URL}/api/v1/tasks/{task_id}/comments",
headers={"Authorization": f"Bearer {vikunja_token}", "Content-Type": "application/json"},
json={"comment": reason},
timeout=10,
).raise_for_status()
except Exception as e:
log.warning("Failed to post stale comment on task %d: %s", task_id, e)
def watchdog_stale_tasks(
vikunja_token: str,
batch_v1: k8s_client.BatchV1Api,
project_id: int,
view_id: int,
in_progress_id: int,
todo_id: int,
backlog_id: int,
) -> None:
"""
For every task stuck in InProgress, check whether its Job is still alive.
- No job found or job terminally failed:
- attempt count < MAX_TASK_RETRIES → move back to Todo
- attempt count >= MAX_TASK_RETRIES → move to Backlog + comment
"""
stale = list_tasks_in_bucket(vikunja_token, project_id, in_progress_id)
log.info("Watchdog: checking %d InProgress tasks", len(stale))
for task in stale:
task_id = task["id"]
active, total_ever = count_jobs_for_task(batch_v1, task_id)
if active > 0:
log.debug("Task %d has %d active job(s), leaving alone", task_id, active)
continue
# No active job — task is stuck or looping
log.warning("Task %d is stale: active=%d total_attempts=%d", task_id, active, total_ever)
if total_ever >= MAX_TASK_RETRIES:
log.error("Task %d hit retry limit (%d/%d), moving to Backlog", task_id, total_ever, MAX_TASK_RETRIES)
try:
vikunja_post(
vikunja_token,
f"projects/{project_id}/views/{view_id}/buckets/{backlog_id}/tasks",
{"task_id": task_id},
)
except Exception as e:
log.warning("Failed to move task %d to Backlog: %s", task_id, e)
post_stale_comment(
vikunja_token, task_id,
f"🚨 **Watchdog**: task has been attempted {total_ever} time(s) and hit the retry limit "
f"(`MAX_TASK_RETRIES={MAX_TASK_RETRIES}`). Moved to **Backlog** for manual review."
)
else:
log.info("Task %d attempt %d/%d, requeueing to Todo", task_id, total_ever + 1, MAX_TASK_RETRIES)
try:
vikunja_post(
vikunja_token,
f"projects/{project_id}/views/{view_id}/buckets/{todo_id}/tasks",
{"task_id": task_id},
)
except Exception as e:
log.warning("Failed to requeue task %d to Todo: %s", task_id, e)
post_stale_comment(
vikunja_token, task_id,
f"⚠️ **Watchdog**: no active job found for this task (attempt {total_ever + 1}/{MAX_TASK_RETRIES}). "
f"Requeued to **Todo** for retry."
)
# ── Review orchestration ──────────────────────────────────────────────────────
# Roles that should NOT trigger review orchestration (would cause loops)
REVIEW_SKIP_ROLES = {"pm", "code-reviewer"}
def spawn_review_pm_job(
batch_v1: k8s_client.BatchV1Api,
task_id: int,
task_title: str,
in_review_bucket_id: int,
project_id: int,
view_id: int,
) -> None:
"""Spawn a PM agent job to orchestrate review sub-tasks for a completed task."""
name = f"review-pm-{task_id}"
if job_already_exists(batch_v1, name):
log.debug("Review PM job %s already exists, skipping", name)
return
job = k8s_client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=K8S_NAMESPACE,
labels={
"autojanet/type": "review-pm",
"autojanet/role": "pm",
"autojanet/task-id": str(task_id),
},
),
spec=k8s_client.V1JobSpec(
ttl_seconds_after_finished=3600,
backoff_limit=1,
template=k8s_client.V1PodTemplateSpec(
metadata=k8s_client.V1ObjectMeta(
labels={
"autojanet/type": "review-pm",
"autojanet/role": "pm",
"autojanet/task-id": str(task_id),
}
),
spec=k8s_client.V1PodSpec(
service_account_name="agent-pm",
restart_policy="Never",
node_selector={"kubernetes.io/arch": "amd64"},
containers=[
k8s_client.V1Container(
name="agent",
image=AGENT_IMAGE,
image_pull_policy="Always",
env=[
k8s_client.V1EnvVar(name="AGENT_ROLE", value="pm"),
k8s_client.V1EnvVar(name="TASK_TYPE", value="review_orchestration"),
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL),
k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"),
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"),
k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)),
k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)),
k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)),
k8s_client.V1EnvVar(
name="OPENBAO_ROLE_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name="agent-pm-approle", key="role_id",
)
),
),
k8s_client.V1EnvVar(
name="OPENBAO_SECRET_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name="agent-pm-approle", key="secret_id",
)
),
),
],
resources=k8s_client.V1ResourceRequirements(
requests={"cpu": "250m", "memory": "512Mi"},
limits={"cpu": "2000m", "memory": "2Gi"},
),
security_context=k8s_client.V1SecurityContext(
allow_privilege_escalation=False,
run_as_non_root=True,
run_as_user=1000,
capabilities=k8s_client.V1Capabilities(drop=["ALL"]),
),
)
],
),
),
),
)
log.info("Spawning review PM job %s for task %d", name, task_id)
batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30)
def orchestrate_review_tasks(
vikunja_token: str,
batch_v1: k8s_client.BatchV1Api,
project_id: int,
view_id: int,
in_review_id: int,
) -> None:
"""
Scan the Review bucket. For each task that has a non-pm/non-reviewer agent
label and no review-pm job yet, spawn a PM agent to create review sub-tasks.
"""
review_tasks = list_tasks_in_bucket(vikunja_token, project_id, in_review_id)
log.info("Review orchestration: checking %d tasks in Review bucket", len(review_tasks))
for task in review_tasks:
task_id = task["id"]
role = extract_agent_role(task)
if not role or role in REVIEW_SKIP_ROLES:
log.debug("Task %d role=%s skipped for review orchestration", task_id, role)
continue
spawn_review_pm_job(
batch_v1,
task_id=task_id,
task_title=task.get("title", f"Task {task_id}"),
in_review_bucket_id=in_review_id,
project_id=project_id,
view_id=view_id,
)
# ── Kubernetes ────────────────────────────────────────────────────────────────
def load_k8s_config() -> None:
try:
k8s_config.load_incluster_config()
except k8s_config.ConfigException:
k8s_config.load_kube_config()
def count_active_jobs(batch_v1: k8s_client.BatchV1Api) -> tuple[int, dict[str, int]]:
"""Return (total_active, {role: count}) for all non-completed agent jobs."""
jobs = batch_v1.list_namespaced_job(
namespace=K8S_NAMESPACE,
label_selector="autojanet/type=agent",
_request_timeout=15,
)
total = 0
by_role: dict[str, int] = {}
for job in jobs.items:
# Skip completed/failed jobs
status = job.status
if (status.succeeded or 0) > 0 or (status.failed or 0) >= (job.spec.backoff_limit or 1) + 1:
continue
total += 1
role = job.metadata.labels.get("autojanet/role", "unknown")
by_role[role] = by_role.get(role, 0) + 1
return total, by_role
def job_name(role: str, task_id: int) -> str:
safe_role = role.replace("-", "")[:12]
return f"agent-{safe_role}-{task_id}"
def job_already_exists(batch_v1: k8s_client.BatchV1Api, name: str) -> bool:
try:
batch_v1.read_namespaced_job(name=name, namespace=K8S_NAMESPACE)
return True
except k8s_client.ApiException as e:
if e.status == 404:
return False
raise
def spawn_agent_job(
batch_v1: k8s_client.BatchV1Api,
role: str,
task_id: int,
task_title: str,
in_review_bucket_id: int,
project_id: int,
view_id: int,
) -> None:
name = job_name(role, task_id)
if job_already_exists(batch_v1, name):
log.info("Job %s already exists, skipping", name)
return
job = k8s_client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=K8S_NAMESPACE,
labels={
"autojanet/type": "agent",
"autojanet/role": role,
"autojanet/task-id": str(task_id),
},
),
spec=k8s_client.V1JobSpec(
ttl_seconds_after_finished=3600,
backoff_limit=1,
template=k8s_client.V1PodTemplateSpec(
metadata=k8s_client.V1ObjectMeta(
labels={
"autojanet/type": "agent",
"autojanet/role": role,
"autojanet/task-id": str(task_id),
}
),
spec=k8s_client.V1PodSpec(
service_account_name=f"agent-{role}",
restart_policy="Never",
node_selector={"kubernetes.io/arch": "amd64"},
containers=[
k8s_client.V1Container(
name="agent",
image=AGENT_IMAGE,
image_pull_policy="Always",
env=[
k8s_client.V1EnvVar(name="AGENT_ROLE", value=role),
k8s_client.V1EnvVar(name="TASK_ID", value=str(task_id)),
k8s_client.V1EnvVar(name="TASK_TITLE", value=task_title),
k8s_client.V1EnvVar(name="OPENBAO_ADDR", value=OPENBAO_ADDR),
k8s_client.V1EnvVar(name="VIKUNJA_BASE_URL", value=VIKUNJA_BASE_URL),
k8s_client.V1EnvVar(name="LITELLM_BASE_URL", value="https://llm.ctz.fyi"),
k8s_client.V1EnvVar(name="FORGEJO_BASE_URL", value="https://git.ctz.fyi"),
k8s_client.V1EnvVar(name="IN_REVIEW_BUCKET_ID", value=str(in_review_bucket_id)),
k8s_client.V1EnvVar(name="VIKUNJA_PROJECT_ID", value=str(project_id)),
k8s_client.V1EnvVar(name="VIKUNJA_VIEW_ID", value=str(view_id)),
k8s_client.V1EnvVar(
name="OPENBAO_ROLE_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=f"agent-{role}-approle",
key="role_id",
)
),
),
k8s_client.V1EnvVar(
name="OPENBAO_SECRET_ID",
value_from=k8s_client.V1EnvVarSource(
secret_key_ref=k8s_client.V1SecretKeySelector(
name=f"agent-{role}-approle",
key="secret_id",
)
),
),
],
resources=k8s_client.V1ResourceRequirements(
requests={"cpu": "250m", "memory": "512Mi"},
limits={"cpu": "2000m", "memory": "2Gi"},
),
security_context=k8s_client.V1SecurityContext(
allow_privilege_escalation=False,
run_as_non_root=True,
run_as_user=1000,
capabilities=k8s_client.V1Capabilities(drop=["ALL"]),
),
)
],
),
),
),
)
log.info("Creating k8s job %s", name)
batch_v1.create_namespaced_job(namespace=K8S_NAMESPACE, body=job, _request_timeout=30)
log.info("Spawned job %s for role=%s task=%d", name, role, task_id)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
log.info("Dispatcher starting")
# Auth
bao_token = get_openbao_token()
vikunja_token = get_vikunja_token(bao_token)
log.info("Authenticated to OpenBao and Vikunja")
# Discover project + kanban view + buckets by name
project_id = find_project_id(vikunja_token, VIKUNJA_PROJECT_NAME)
view_id = find_kanban_view_id(vikunja_token, project_id)
buckets = discover_buckets(vikunja_token, project_id, view_id)
todo_id = buckets.get(BUCKET_TODO)
in_progress_id = buckets.get(BUCKET_IN_PROGRESS)
in_review_id = buckets.get(BUCKET_IN_REVIEW)
done_id = buckets.get(BUCKET_DONE)
backlog_id = buckets.get(BUCKET_BACKLOG)
if not all([todo_id, in_progress_id, in_review_id, done_id, backlog_id]):
log.error("Could not find all standard buckets. Found: %s", list(buckets.keys()))
sys.exit(1)
# k8s
load_k8s_config()
batch_v1 = k8s_client.BatchV1Api()
# 1. Sweep: move any done=True tasks into the Done bucket
sweep_done_tasks(vikunja_token, project_id, view_id, done_id)
# 2. Watchdog: requeue or escalate stale InProgress tasks
watchdog_stale_tasks(
vikunja_token, batch_v1,
project_id, view_id,
in_progress_id, todo_id, backlog_id,
)
# 3. Review orchestration: spawn PM jobs for tasks awaiting review
orchestrate_review_tasks(vikunja_token, batch_v1, project_id, view_id, in_review_id)
# 4. Scan Todo bucket for claimable tasks
tasks = list_todo_tasks(vikunja_token, project_id, todo_id)
log.info("Found %d candidate tasks in Todo bucket", len(tasks))
# Check current job counts before spawning anything
total_active, active_by_role = count_active_jobs(batch_v1)
log.info("Active jobs: total=%d limits(per_role=%d, total=%d)", total_active, MAX_JOBS_PER_ROLE, MAX_JOBS_TOTAL)
claimed = 0
for task in tasks:
task_id = task["id"]
title = task.get("title", "")
role = extract_agent_role(task)
if not role:
log.debug("Task %d has no valid agent label, skipping", task_id)
continue
# Enforce concurrency limits
if total_active >= MAX_JOBS_TOTAL:
log.info("Global job limit reached (%d/%d), stopping", total_active, MAX_JOBS_TOTAL)
break
if active_by_role.get(role, 0) >= MAX_JOBS_PER_ROLE:
log.info("Role %s at limit (%d/%d), skipping task %d", role, active_by_role.get(role, 0), MAX_JOBS_PER_ROLE, task_id)
continue
log.info("Claiming task %d (%s) for role=%s", task_id, title[:60], role)
if not claim_task(vikunja_token, task_id, in_progress_id):
continue
try:
spawn_agent_job(batch_v1, role, task_id, title, in_review_id, project_id, view_id)
claimed += 1
total_active += 1
active_by_role[role] = active_by_role.get(role, 0) + 1
except Exception as e:
log.error("Failed to spawn job for task %d: %s", task_id, e)
unclaim_task(vikunja_token, task_id, todo_id)
log.info("Dispatcher done. Claimed %d tasks.", claimed)
if __name__ == "__main__":
main()