Files
agent-aide/aide-program/aide/flow/tracker.py

261 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FlowTracker编排一次 flow 动作(校验 → hooks → 落盘 → git → 输出)。"""
from __future__ import annotations
from pathlib import Path
from aide.core import output
from aide.core.config import ConfigManager
from aide.flow.branch import BranchManager
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.hooks import run_post_commit_hooks, run_pre_commit_hooks
from aide.flow.storage import FlowStorage
from aide.flow.types import FlowStatus, HistoryEntry
from aide.flow.utils import now_iso, now_task_id, normalize_text
from aide.flow.validator import FlowValidator
DEFAULT_PHASES = ["task-optimize", "flow-design", "impl", "verify", "docs", "finish"]
class FlowTracker:
def __init__(self, root: Path, cfg: ConfigManager):
self.root = root
self.cfg = cfg
self.storage = FlowStorage(root)
self.git = GitIntegration(root)
self.branch_mgr = BranchManager(root, self.git)
def start(self, phase: str, summary: str) -> bool:
return self._run(action="start", to_phase=phase, text=summary)
def next_step(self, summary: str) -> bool:
return self._run(action="next-step", to_phase=None, text=summary)
def back_step(self, reason: str) -> bool:
return self._run(action="back-step", to_phase=None, text=reason)
def next_part(self, phase: str, summary: str) -> bool:
return self._run(action="next-part", to_phase=phase, text=summary)
def back_part(self, phase: str, reason: str) -> bool:
return self._run(action="back-part", to_phase=phase, text=reason)
def issue(self, description: str) -> bool:
return self._run(action="issue", to_phase=None, text=description)
def error(self, description: str) -> bool:
return self._run(action="error", to_phase=None, text=description)
def _run(self, *, action: str, to_phase: str | None, text: str) -> bool:
try:
self.storage.ensure_ready()
config = self.cfg.load_config()
phases = _get_phases(config)
validator = FlowValidator(phases)
normalized_text = normalize_text(text)
if not normalized_text:
raise FlowError("文本参数不能为空")
with self.storage.lock():
if action == "start":
assert to_phase is not None
validator.validate_start(to_phase)
self.storage.archive_existing_status()
# 创建任务分支
task_id = now_task_id()
task_branch = self.branch_mgr.create_task_branch(
task_id=task_id,
task_summary=normalized_text,
)
branch_info = self.branch_mgr.get_active_branch_info()
status = FlowStatus(
task_id=task_id,
current_phase=to_phase,
current_step=0,
started_at=now_iso(),
history=[],
source_branch=branch_info.source_branch if branch_info else None,
start_commit=branch_info.start_commit if branch_info else None,
task_branch=task_branch,
)
updated, commit_msg = self._apply_action(
status=status,
action=action,
from_phase=None,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
# 先保存状态,再执行 git 操作
self.storage.save_status(updated)
final_status = self._do_git_commit(updated, commit_msg)
self.storage.save_status(final_status)
output.ok(f"任务开始: {to_phase} (分支: {task_branch})")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
status = self.storage.load_status()
if status is None:
raise FlowError("未找到流程状态请先运行aide flow start <环节名> \"<总结>\"")
current_phase = status.current_phase
validator.validate_phase_exists(current_phase)
if action == "next-part":
assert to_phase is not None
validator.validate_next_part(current_phase, to_phase)
# 如果进入 finish 环节,执行分支合并
if to_phase == "finish":
success, merge_msg = self.branch_mgr.finish_branch_merge(
task_summary=normalized_text,
)
if not success:
output.warn(merge_msg)
elif action == "back-part":
assert to_phase is not None
validator.validate_back_part(current_phase, to_phase)
else:
to_phase = current_phase
updated, commit_msg = self._apply_action(
status=status,
action=action,
from_phase=current_phase,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
# 先保存状态,再执行 git 操作
self.storage.save_status(updated)
final_status = self._do_git_commit(updated, commit_msg)
self.storage.save_status(final_status)
if action == "next-part":
output.ok(f"进入环节: {to_phase}")
elif action == "back-part":
output.warn(f"回退到环节: {to_phase}")
elif action == "error":
output.err(f"错误已记录: {normalized_text}")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
except FlowError as exc:
output.err(str(exc))
return False
def _apply_action(
self,
*,
status: FlowStatus,
action: str,
from_phase: str | None,
to_phase: str,
text: str,
validator: FlowValidator,
) -> tuple[FlowStatus, str]:
"""应用动作,返回 (更新后的状态, commit消息)。不执行 git 操作。"""
if action in {"next-part", "back-part"} and from_phase is None:
raise FlowError("内部错误:缺少 from_phase")
if action == "next-part":
assert from_phase is not None
validator.validate_next_part(from_phase, to_phase)
elif action == "back-part":
assert from_phase is not None
validator.validate_back_part(from_phase, to_phase)
elif action == "start":
validator.validate_start(to_phase)
else:
validator.validate_phase_exists(to_phase)
config = self.cfg.load_config()
run_pre_commit_hooks(
root=self.root,
git=self.git,
status=status,
from_phase=from_phase,
to_phase=to_phase,
action=action,
config=config,
)
message = _build_commit_message(action=action, phase=to_phase, text=text)
next_step = status.current_step + 1
entry = HistoryEntry(
timestamp=now_iso(),
action=action,
phase=to_phase,
step=next_step,
summary=text,
git_commit=None, # 暂时为 None后续在 git 提交后更新
)
history = [*status.history, entry]
updated_status = FlowStatus(
task_id=status.task_id,
current_phase=to_phase,
current_step=next_step,
started_at=status.started_at,
history=history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return updated_status, message
def _do_git_commit(self, status: FlowStatus, message: str) -> FlowStatus:
"""执行 git add + commit并更新状态中的 commit hash。"""
self.git.add_all()
commit_hash = self.git.commit(message)
# 更新最后一条历史记录的 git_commit
if status.history:
last_entry = status.history[-1]
updated_entry = HistoryEntry(
timestamp=last_entry.timestamp,
action=last_entry.action,
phase=last_entry.phase,
step=last_entry.step,
summary=last_entry.summary,
git_commit=commit_hash,
)
updated_history = [*status.history[:-1], updated_entry]
return FlowStatus(
task_id=status.task_id,
current_phase=status.current_phase,
current_step=status.current_step,
started_at=status.started_at,
history=updated_history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return status
def _get_phases(config: dict) -> list[str]:
flow_cfg = config.get("flow", {})
phases = flow_cfg.get("phases", DEFAULT_PHASES)
if not isinstance(phases, list) or not phases:
return DEFAULT_PHASES
return phases
def _build_commit_message(*, action: str, phase: str, text: str) -> str:
if action == "issue":
return f"[aide] {phase} issue: {text}"
if action == "error":
return f"[aide] {phase} error: {text}"
if action == "back-step":
return f"[aide] {phase} back-step: {text}"
if action == "back-part":
return f"[aide] {phase} back-part: {text}"
return f"[aide] {phase}: {text}"