Files
agent-aide/aide-program/aide/flow/tracker.py
2025-12-19 03:58:34 +08:00

362 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FlowTracker编排一次 flow 动作(校验 → hooks → 落盘 → git → 输出)。"""
from __future__ import annotations
from pathlib import Path
from aide.core import output
from aide.core.config import ConfigManager
from aide.flow.branch import BranchManager
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.hooks import run_post_commit_hooks, run_pre_commit_hooks
from aide.flow.storage import FlowStorage
from aide.flow.types import FlowStatus, HistoryEntry
from aide.flow.utils import now_iso, now_task_id, normalize_text
from aide.flow.validator import FlowValidator
DEFAULT_PHASES = ["task-optimize", "flow-design", "impl", "verify", "docs", "finish"]
class FlowTracker:
def __init__(self, root: Path, cfg: ConfigManager):
self.root = root
self.cfg = cfg
self.storage = FlowStorage(root)
self.git = GitIntegration(root)
self.branch_mgr = BranchManager(root, self.git, cfg)
def start(self, phase: str, summary: str) -> bool:
return self._run(action="start", to_phase=phase, text=summary)
def next_step(self, summary: str) -> bool:
return self._run(action="next-step", to_phase=None, text=summary)
def back_step(self, reason: str) -> bool:
return self._run(action="back-step", to_phase=None, text=reason)
def next_part(self, phase: str, summary: str) -> bool:
return self._run(action="next-part", to_phase=phase, text=summary)
def back_part(self, phase: str, reason: str) -> bool:
"""返工请求:检测是否已确认,未确认则生成 key。"""
try:
self.storage.ensure_ready()
# 检查是否存在待确认的 back 请求
if self.storage.has_pending_back_confirm():
state = self.storage.load_back_confirm_state()
if state:
output.warn("已存在待确认的返工请求")
output.info(f"目标环节: {state['target_part']}")
output.info(f"原因: {state['reason']}")
output.info(f"请执行: aide flow back-confirm --key {state['pending_key']}")
return False
# 生成新的确认 key
key = self.storage.save_back_confirm_state(phase, reason)
output.warn("返工需要确认。请先完成以下准备工作:")
output.info("1. 触发 rework skill 学习返工流程指南")
output.info("2. 按照指南更新任务文档(记录返工原因和新需求)")
output.info("3. 完成准备工作后执行:")
output.info(f" aide flow back-confirm --key {key}")
return True
except FlowError as exc:
output.err(str(exc))
return False
def back_confirm(self, key: str) -> bool:
"""确认返工请求并执行。"""
try:
self.storage.ensure_ready()
# 检查是否存在待确认的请求
state = self.storage.load_back_confirm_state()
if state is None:
output.err("无待确认的返工请求")
return False
# 验证 key
if state.get("pending_key") != key:
output.err("确认 key 不匹配")
return False
# 获取目标阶段和原因
target_part = state["target_part"]
reason = state["reason"]
# 清除确认状态文件
self.storage.clear_back_confirm_state()
# 执行实际的 back-part 操作
result = self._run(action="back-part", to_phase=target_part, text=reason)
if result:
output.warn("建议执行 /exit 重新开始对话")
return result
except FlowError as exc:
output.err(str(exc))
return False
def issue(self, description: str) -> bool:
return self._run(action="issue", to_phase=None, text=description)
def error(self, description: str) -> bool:
return self._run(action="error", to_phase=None, text=description)
def clean(self) -> bool:
"""强制清理当前任务
前提条件:工作区必须干净
"""
try:
self.storage.ensure_ready()
with self.storage.lock():
status = self.storage.load_status()
if status is None:
output.err("未找到活跃任务,无需清理")
return False
# 执行强制清理
success, msg = self.branch_mgr.clean_branch_merge()
if success:
output.ok(f"强制清理完成: {msg}")
else:
if "工作区不干净" in msg or "未找到活跃" in msg:
output.err(msg)
else:
# 临时分支情况,需要手动处理
output.warn(msg)
return success
except FlowError as exc:
output.err(str(exc))
return False
def _run(self, *, action: str, to_phase: str | None, text: str) -> bool:
try:
self.storage.ensure_ready()
config = self.cfg.load_config()
phases = _get_phases(config)
validator = FlowValidator(phases)
normalized_text = normalize_text(text)
if not normalized_text:
raise FlowError("文本参数不能为空")
with self.storage.lock():
if action == "start":
assert to_phase is not None
validator.validate_start(to_phase)
self.storage.archive_existing_status()
# 创建任务分支
task_id = now_task_id()
task_branch = self.branch_mgr.create_task_branch(
task_id=task_id,
task_summary=normalized_text,
)
branch_info = self.branch_mgr.get_active_branch_info()
status = FlowStatus(
task_id=task_id,
current_phase=to_phase,
current_step=0,
started_at=now_iso(),
history=[],
source_branch=branch_info.source_branch if branch_info else None,
start_commit=branch_info.start_commit if branch_info else None,
task_branch=task_branch,
)
updated, commit_msg = self._apply_action(
status=status,
action=action,
from_phase=None,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
# 先保存状态,再执行 git 操作
self.storage.save_status(updated)
final_status = self._do_git_commit(updated, commit_msg)
self.storage.save_status(final_status)
output.ok(f"任务开始: {to_phase} (分支: {task_branch})")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
status = self.storage.load_status()
if status is None:
raise FlowError("未找到流程状态请先运行aide flow start <环节名> \"<总结>\"")
current_phase = status.current_phase
validator.validate_phase_exists(current_phase)
if action == "next-part":
assert to_phase is not None
validator.validate_next_part(current_phase, to_phase)
elif action == "back-part":
assert to_phase is not None
validator.validate_back_part(current_phase, to_phase)
else:
to_phase = current_phase
updated, commit_msg = self._apply_action(
status=status,
action=action,
from_phase=current_phase,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
# 先保存状态,再执行 git 操作
self.storage.save_status(updated)
final_status = self._do_git_commit(updated, commit_msg)
self.storage.save_status(final_status)
# 如果进入 finish 环节,执行分支合并(必须在提交后执行)
if action == "next-part" and to_phase == "finish":
# finish 提交的哈希就是 end_commit
finish_commit = None
finish_timestamp = None
if final_status.history:
last_entry = final_status.history[-1]
finish_commit = last_entry.git_commit
finish_timestamp = last_entry.timestamp
success, merge_msg = self.branch_mgr.finish_branch_merge(
task_summary=normalized_text,
end_commit=finish_commit,
finished_at=finish_timestamp,
)
if not success:
output.warn(merge_msg)
if action == "next-part":
output.ok(f"进入环节: {to_phase}")
elif action == "back-part":
output.warn(f"回退到环节: {to_phase}")
elif action == "error":
output.err(f"错误已记录: {normalized_text}")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
except FlowError as exc:
output.err(str(exc))
return False
def _apply_action(
self,
*,
status: FlowStatus,
action: str,
from_phase: str | None,
to_phase: str,
text: str,
validator: FlowValidator,
) -> tuple[FlowStatus, str]:
"""应用动作,返回 (更新后的状态, commit消息)。不执行 git 操作。"""
if action in {"next-part", "back-part"} and from_phase is None:
raise FlowError("内部错误:缺少 from_phase")
if action == "next-part":
assert from_phase is not None
validator.validate_next_part(from_phase, to_phase)
elif action == "back-part":
assert from_phase is not None
validator.validate_back_part(from_phase, to_phase)
elif action == "start":
validator.validate_start(to_phase)
else:
validator.validate_phase_exists(to_phase)
config = self.cfg.load_config()
run_pre_commit_hooks(
root=self.root,
git=self.git,
status=status,
from_phase=from_phase,
to_phase=to_phase,
action=action,
config=config,
)
message = _build_commit_message(action=action, phase=to_phase, text=text)
next_step = status.current_step + 1
entry = HistoryEntry(
timestamp=now_iso(),
action=action,
phase=to_phase,
step=next_step,
summary=text,
git_commit=None, # 暂时为 None后续在 git 提交后更新
)
history = [*status.history, entry]
updated_status = FlowStatus(
task_id=status.task_id,
current_phase=to_phase,
current_step=next_step,
started_at=status.started_at,
history=history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return updated_status, message
def _do_git_commit(self, status: FlowStatus, message: str) -> FlowStatus:
"""执行 git add + commit并更新状态中的 commit hash。"""
self.git.add_all()
commit_hash = self.git.commit(message)
# 更新最后一条历史记录的 git_commit
if status.history:
last_entry = status.history[-1]
updated_entry = HistoryEntry(
timestamp=last_entry.timestamp,
action=last_entry.action,
phase=last_entry.phase,
step=last_entry.step,
summary=last_entry.summary,
git_commit=commit_hash,
)
updated_history = [*status.history[:-1], updated_entry]
return FlowStatus(
task_id=status.task_id,
current_phase=status.current_phase,
current_step=status.current_step,
started_at=status.started_at,
history=updated_history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return status
def _get_phases(config: dict) -> list[str]:
flow_cfg = config.get("flow", {})
phases = flow_cfg.get("phases", DEFAULT_PHASES)
if not isinstance(phases, list) or not phases:
return DEFAULT_PHASES
return phases
def _build_commit_message(*, action: str, phase: str, text: str) -> str:
if action == "issue":
return f"[aide] {phase} issue: {text}"
if action == "error":
return f"[aide] {phase} error: {text}"
if action == "back-step":
return f"[aide] {phase} back-step: {text}"
if action == "back-part":
return f"[aide] {phase} back-part: {text}"
return f"[aide] {phase}: {text}"