feat: 完成aide flow程序实现

This commit is contained in:
2025-12-14 22:28:37 +08:00
parent a0c9173bc4
commit 1cf99c7b5f
11 changed files with 731 additions and 1 deletions

View File

@@ -1,5 +1,7 @@
import sys
from aide.main import main from aide.main import main
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

View File

@@ -0,0 +1,2 @@
"""aide flow进度追踪与 Git 集成。"""

View File

@@ -0,0 +1,8 @@
"""错误类型:用于将内部失败统一映射为 CLI 输出。"""
from __future__ import annotations
class FlowError(RuntimeError):
pass

View File

@@ -0,0 +1,78 @@
"""Git 操作封装add、commit、查询提交变更文件。"""
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
from aide.flow.errors import FlowError
class GitIntegration:
def __init__(self, root: Path):
self.root = root
def ensure_available(self) -> None:
if shutil.which("git") is None:
raise FlowError("未找到 git 命令,请先安装 git")
def ensure_repo(self) -> None:
self.ensure_available()
result = self._run(["rev-parse", "--is-inside-work-tree"], check=False)
if result.returncode != 0 or "true" not in (result.stdout or ""):
raise FlowError("当前目录不是 git 仓库,请先执行 git init 或切换到正确目录")
def add_all(self) -> None:
self.ensure_repo()
result = self._run(["add", "."], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("git add 失败", result))
def commit(self, message: str) -> str | None:
self.ensure_repo()
diff = self._run(["diff", "--cached", "--quiet"], check=False)
if diff.returncode == 0:
return None
if diff.returncode != 1:
raise FlowError(_format_git_error("git diff 失败", diff))
result = self._run(["commit", "-m", message], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("git commit 失败", result))
return self.rev_parse_head()
def rev_parse_head(self) -> str:
result = self._run(["rev-parse", "HEAD"], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("获取 commit hash 失败", result))
return (result.stdout or "").strip()
def status_porcelain(self, path: str) -> str:
result = self._run(["status", "--porcelain", "--", path], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("git status 失败", result))
return result.stdout or ""
def commit_touches_path(self, commit_hash: str, path: str) -> bool:
result = self._run(["show", "--name-only", "--pretty=format:", commit_hash], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"读取提交内容失败: {commit_hash}", result))
files = [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]
return path in files
def _run(self, args: list[str], check: bool) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["git", *args],
cwd=self.root,
text=True,
capture_output=True,
check=check,
)
def _format_git_error(prefix: str, result: subprocess.CompletedProcess[str]) -> str:
detail = (result.stderr or "").strip() or (result.stdout or "").strip()
if not detail:
return prefix
return f"{prefix}: {detail}"

View File

@@ -0,0 +1,85 @@
"""环节钩子PlantUML 与 CHANGELOG 校验。"""
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
from aide.core import output
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.types import FlowStatus
def run_pre_commit_hooks(
*,
root: Path,
git: GitIntegration,
status: FlowStatus | None,
from_phase: str | None,
to_phase: str,
action: str,
) -> None:
if from_phase == "flow-design" and action in {"next-part", "back-part"}:
_hook_plantuml(root=root)
if from_phase == "docs" and action in {"next-part", "back-part"}:
_hook_changelog_on_leave_docs(root=root, git=git, status=status)
def run_post_commit_hooks(*, to_phase: str, action: str) -> None:
if to_phase == "docs" and action in {"start", "next-part", "back-part"}:
output.info("请更新 CHANGELOG.md")
def _hook_plantuml(*, root: Path) -> None:
docs_dir = root / "docs"
discuss_dir = root / "discuss"
candidates: list[Path] = []
for base in (docs_dir, discuss_dir):
if not base.exists():
continue
candidates.extend([p for p in base.rglob("*.puml") if p.is_file()])
candidates.extend([p for p in base.rglob("*.plantuml") if p.is_file()])
if not candidates:
return
if shutil.which("plantuml") is None:
output.warn("未找到 plantuml已跳过 PlantUML 校验/PNG 生成")
return
for file_path in candidates:
result = subprocess.run(
["plantuml", "-tpng", str(file_path)],
cwd=root,
text=True,
capture_output=True,
)
if result.returncode != 0:
detail = (result.stderr or "").strip() or (result.stdout or "").strip()
raise FlowError(f"PlantUML 处理失败: {file_path} {detail}".strip())
def _hook_changelog_on_leave_docs(*, root: Path, git: GitIntegration, status: FlowStatus | None) -> None:
changelog = root / "CHANGELOG.md"
if not changelog.exists():
raise FlowError("离开 docs 前需要更新 CHANGELOG.md当前文件不存在")
git.ensure_repo()
if git.status_porcelain("CHANGELOG.md").strip():
return
if status is None:
raise FlowError("离开 docs 前需要更新 CHANGELOG.md未找到流程状态")
for entry in status.history:
if entry.phase != "docs":
continue
if not entry.git_commit:
continue
if git.commit_touches_path(entry.git_commit, "CHANGELOG.md"):
return
raise FlowError("离开 docs 前需要更新 CHANGELOG.md未检测到 docs 阶段的更新记录)")

View File

@@ -0,0 +1,91 @@
"""状态文件读写:锁、原子写入、归档。"""
from __future__ import annotations
import json
import os
import time
from contextlib import contextmanager
from pathlib import Path
from aide.flow.errors import FlowError
from aide.flow.types import FlowStatus
from aide.flow.utils import now_task_id
class FlowStorage:
def __init__(self, root: Path):
self.root = root
self.aide_dir = self.root / ".aide"
self.status_path = self.aide_dir / "flow-status.json"
self.lock_path = self.aide_dir / "flow-status.lock"
self.tmp_path = self.aide_dir / "flow-status.json.tmp"
self.logs_dir = self.aide_dir / "logs"
def ensure_ready(self) -> None:
if not self.aide_dir.exists():
raise FlowError("未找到 .aide 目录请先运行aide init")
self.logs_dir.mkdir(parents=True, exist_ok=True)
@contextmanager
def lock(self, timeout_seconds: float = 3.0, poll_seconds: float = 0.2):
self.ensure_ready()
start = time.time()
fd: int | None = None
while True:
try:
fd = os.open(str(self.lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode("utf-8"))
break
except FileExistsError:
if time.time() - start >= timeout_seconds:
raise FlowError("状态文件被占用,请稍后重试或删除 .aide/flow-status.lock")
time.sleep(poll_seconds)
try:
yield
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
try:
self.lock_path.unlink(missing_ok=True)
except Exception:
pass
def load_status(self) -> FlowStatus | None:
if not self.status_path.exists():
return None
try:
raw = self.status_path.read_text(encoding="utf-8")
data = json.loads(raw)
if not isinstance(data, dict):
raise ValueError("状态文件顶层必须为对象")
return FlowStatus.from_dict(data)
except Exception as exc:
raise FlowError(f"状态文件解析失败: {exc}")
def save_status(self, status: FlowStatus) -> None:
payload = json.dumps(status.to_dict(), ensure_ascii=False, indent=2) + "\n"
try:
self.tmp_path.write_text(payload, encoding="utf-8")
os.replace(self.tmp_path, self.status_path)
except Exception as exc:
raise FlowError(f"写入状态文件失败: {exc}")
def archive_existing_status(self) -> None:
if not self.status_path.exists():
return
suffix = now_task_id()
try:
current = self.load_status()
suffix = current.task_id
except FlowError:
pass
target = self.logs_dir / f"flow-status.{suffix}.json"
try:
os.replace(self.status_path, target)
except Exception as exc:
raise FlowError(f"归档旧状态失败: {exc}")

View File

@@ -0,0 +1,198 @@
"""FlowTracker编排一次 flow 动作(校验 → hooks → git → 落盘 → 输出)。"""
from __future__ import annotations
from pathlib import Path
from aide.core import output
from aide.core.config import ConfigManager
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.hooks import run_post_commit_hooks, run_pre_commit_hooks
from aide.flow.storage import FlowStorage
from aide.flow.types import FlowStatus, HistoryEntry
from aide.flow.utils import now_iso, now_task_id, normalize_text
from aide.flow.validator import FlowValidator
DEFAULT_PHASES = ["task-optimize", "flow-design", "impl", "verify", "docs", "finish"]
class FlowTracker:
def __init__(self, root: Path, cfg: ConfigManager):
self.root = root
self.cfg = cfg
self.storage = FlowStorage(root)
self.git = GitIntegration(root)
def start(self, phase: str, summary: str) -> bool:
return self._run(action="start", to_phase=phase, text=summary)
def next_step(self, summary: str) -> bool:
return self._run(action="next-step", to_phase=None, text=summary)
def back_step(self, reason: str) -> bool:
return self._run(action="back-step", to_phase=None, text=reason)
def next_part(self, phase: str, summary: str) -> bool:
return self._run(action="next-part", to_phase=phase, text=summary)
def back_part(self, phase: str, reason: str) -> bool:
return self._run(action="back-part", to_phase=phase, text=reason)
def issue(self, description: str) -> bool:
return self._run(action="issue", to_phase=None, text=description)
def error(self, description: str) -> bool:
return self._run(action="error", to_phase=None, text=description)
def _run(self, *, action: str, to_phase: str | None, text: str) -> bool:
try:
self.storage.ensure_ready()
config = self.cfg.load_config()
phases = _get_phases(config)
validator = FlowValidator(phases)
normalized_text = normalize_text(text)
if not normalized_text:
raise FlowError("文本参数不能为空")
with self.storage.lock():
if action == "start":
assert to_phase is not None
validator.validate_start(to_phase)
self.storage.archive_existing_status()
status = FlowStatus(
task_id=now_task_id(),
current_phase=to_phase,
current_step=0,
started_at=now_iso(),
history=[],
)
updated = self._apply_action(
status=status,
action=action,
from_phase=None,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
self.storage.save_status(updated)
output.ok(f"任务开始: {to_phase}")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
status = self.storage.load_status()
if status is None:
raise FlowError("未找到流程状态请先运行aide flow start <环节名> \"<总结>\"")
current_phase = status.current_phase
validator.validate_phase_exists(current_phase)
if action == "next-part":
assert to_phase is not None
validator.validate_next_part(current_phase, to_phase)
elif action == "back-part":
assert to_phase is not None
validator.validate_back_part(current_phase, to_phase)
else:
to_phase = current_phase
updated = self._apply_action(
status=status,
action=action,
from_phase=current_phase,
to_phase=to_phase,
text=normalized_text,
validator=validator,
)
self.storage.save_status(updated)
if action == "next-part":
output.ok(f"进入环节: {to_phase}")
elif action == "back-part":
output.warn(f"回退到环节: {to_phase}")
elif action == "error":
output.err(f"错误已记录: {normalized_text}")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
except FlowError as exc:
output.err(str(exc))
return False
def _apply_action(
self,
*,
status: FlowStatus,
action: str,
from_phase: str | None,
to_phase: str,
text: str,
validator: FlowValidator,
) -> FlowStatus:
if action in {"next-part", "back-part"} and from_phase is None:
raise FlowError("内部错误:缺少 from_phase")
if action == "next-part":
assert from_phase is not None
validator.validate_next_part(from_phase, to_phase)
elif action == "back-part":
assert from_phase is not None
validator.validate_back_part(from_phase, to_phase)
elif action == "start":
validator.validate_start(to_phase)
else:
validator.validate_phase_exists(to_phase)
run_pre_commit_hooks(
root=self.root,
git=self.git,
status=status,
from_phase=from_phase,
to_phase=to_phase,
action=action,
)
message = _build_commit_message(action=action, phase=to_phase, text=text)
self.git.add_all()
commit_hash = self.git.commit(message)
next_step = status.current_step + 1
entry = HistoryEntry(
timestamp=now_iso(),
action=action,
phase=to_phase,
step=next_step,
summary=text,
git_commit=commit_hash,
)
history = [*status.history, entry]
return FlowStatus(
task_id=status.task_id,
current_phase=to_phase,
current_step=next_step,
started_at=status.started_at,
history=history,
)
def _get_phases(config: dict) -> list[str]:
flow_cfg = config.get("flow", {})
phases = flow_cfg.get("phases", DEFAULT_PHASES)
if not isinstance(phases, list) or not phases:
return DEFAULT_PHASES
return phases
def _build_commit_message(*, action: str, phase: str, text: str) -> str:
if action == "issue":
return f"[aide] {phase} issue: {text}"
if action == "error":
return f"[aide] {phase} error: {text}"
if action == "back-step":
return f"[aide] {phase} back-step: {text}"
if action == "back-part":
return f"[aide] {phase} back-part: {text}"
return f"[aide] {phase}: {text}"

View File

@@ -0,0 +1,102 @@
"""数据结构:流程状态与历史条目。"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class HistoryEntry:
timestamp: str
action: str
phase: str
step: int
summary: str
git_commit: str | None = None
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {
"timestamp": self.timestamp,
"action": self.action,
"phase": self.phase,
"step": self.step,
"summary": self.summary,
}
if self.git_commit is not None:
data["git_commit"] = self.git_commit
return data
@staticmethod
def from_dict(data: dict[str, Any]) -> "HistoryEntry":
timestamp = _require_str(data, "timestamp")
action = _require_str(data, "action")
phase = _require_str(data, "phase")
step = _require_int(data, "step")
summary = _require_str(data, "summary")
git_commit = data.get("git_commit")
if git_commit is not None and not isinstance(git_commit, str):
raise ValueError("git_commit 必须为字符串或缺失")
return HistoryEntry(
timestamp=timestamp,
action=action,
phase=phase,
step=step,
summary=summary,
git_commit=git_commit,
)
@dataclass(frozen=True)
class FlowStatus:
task_id: str
current_phase: str
current_step: int
started_at: str
history: list[HistoryEntry]
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"current_phase": self.current_phase,
"current_step": self.current_step,
"started_at": self.started_at,
"history": [h.to_dict() for h in self.history],
}
@staticmethod
def from_dict(data: dict[str, Any]) -> "FlowStatus":
task_id = _require_str(data, "task_id")
current_phase = _require_str(data, "current_phase")
current_step = _require_int(data, "current_step")
started_at = _require_str(data, "started_at")
raw_history = data.get("history")
if not isinstance(raw_history, list):
raise ValueError("history 必须为列表")
history: list[HistoryEntry] = []
for item in raw_history:
if not isinstance(item, dict):
raise ValueError("history 条目必须为对象")
history.append(HistoryEntry.from_dict(item))
return FlowStatus(
task_id=task_id,
current_phase=current_phase,
current_step=current_step,
started_at=started_at,
history=history,
)
def _require_str(data: dict[str, Any], key: str) -> str:
value = data.get(key)
if not isinstance(value, str) or not value.strip():
raise ValueError(f"{key} 必须为非空字符串")
return value
def _require_int(data: dict[str, Any], key: str) -> int:
value = data.get(key)
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError(f"{key} 必须为整数")
return value

View File

@@ -0,0 +1,18 @@
"""工具函数:时间戳与文本处理。"""
from __future__ import annotations
from datetime import datetime
def now_iso() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def now_task_id() -> str:
return datetime.now().astimezone().strftime("%Y-%m-%dT%H-%M-%S")
def normalize_text(value: str) -> str:
return value.strip()

View File

@@ -0,0 +1,54 @@
"""流程校验:环节合法性与跳转规则。"""
from __future__ import annotations
from aide.flow.errors import FlowError
class FlowValidator:
def __init__(self, phases: list[str]):
self.phases = _normalize_phases(phases)
def validate_phase_exists(self, phase: str) -> None:
if phase not in self.phases:
raise FlowError(f"未知环节: {phase}(请检查 flow.phases 配置)")
def validate_start(self, phase: str) -> None:
self.validate_phase_exists(phase)
def validate_next_part(self, from_phase: str, to_phase: str) -> None:
self.validate_phase_exists(from_phase)
self.validate_phase_exists(to_phase)
from_index = self.phases.index(from_phase)
to_index = self.phases.index(to_phase)
if to_index != from_index + 1:
raise FlowError(
f"非法跳转: {from_phase} -> {to_phase}next-part 只能前进到相邻环节)"
)
def validate_back_part(self, from_phase: str, to_phase: str) -> None:
self.validate_phase_exists(from_phase)
self.validate_phase_exists(to_phase)
from_index = self.phases.index(from_phase)
to_index = self.phases.index(to_phase)
if to_index >= from_index:
raise FlowError(
f"非法回退: {from_phase} -> {to_phase}back-part 只能回退到之前环节)"
)
def _normalize_phases(phases: list[str]) -> list[str]:
if not isinstance(phases, list) or not phases:
raise FlowError("flow.phases 配置无效:必须为非空列表")
normalized: list[str] = []
seen: set[str] = set()
for item in phases:
if not isinstance(item, str) or not item.strip():
raise FlowError("flow.phases 配置无效:环节名必须为非空字符串")
name = item.strip()
if name in seen:
raise FlowError(f"flow.phases 配置无效:环节名重复 {name!r}")
seen.add(name)
normalized.append(name)
return normalized

View File

@@ -10,6 +10,7 @@ from typing import Any
from aide.core import output from aide.core import output
from aide.core.config import ConfigManager from aide.core.config import ConfigManager
from aide.env.manager import EnvManager from aide.env.manager import EnvManager
from aide.flow.tracker import FlowTracker
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
@@ -91,6 +92,43 @@ def build_parser() -> argparse.ArgumentParser:
set_parser.add_argument("value", help="要写入的值,支持 bool/int/float/字符串") set_parser.add_argument("value", help="要写入的值,支持 bool/int/float/字符串")
set_parser.set_defaults(func=handle_config_set) set_parser.set_defaults(func=handle_config_set)
# aide flow
flow_parser = subparsers.add_parser("flow", help="进度追踪与 git 集成")
flow_sub = flow_parser.add_subparsers(dest="flow_command")
flow_start = flow_sub.add_parser("start", help="开始新任务")
flow_start.add_argument("phase", help="环节名(来自 flow.phases")
flow_start.add_argument("summary", help="本次操作的简要说明")
flow_start.set_defaults(func=handle_flow_start)
flow_next_step = flow_sub.add_parser("next-step", help="记录步骤前进")
flow_next_step.add_argument("summary", help="本次操作的简要说明")
flow_next_step.set_defaults(func=handle_flow_next_step)
flow_back_step = flow_sub.add_parser("back-step", help="记录步骤回退")
flow_back_step.add_argument("reason", help="回退原因")
flow_back_step.set_defaults(func=handle_flow_back_step)
flow_next_part = flow_sub.add_parser("next-part", help="进入下一环节")
flow_next_part.add_argument("phase", help="目标环节名(相邻下一环节)")
flow_next_part.add_argument("summary", help="本次操作的简要说明")
flow_next_part.set_defaults(func=handle_flow_next_part)
flow_back_part = flow_sub.add_parser("back-part", help="回退到之前环节")
flow_back_part.add_argument("phase", help="目标环节名(任意之前环节)")
flow_back_part.add_argument("reason", help="回退原因")
flow_back_part.set_defaults(func=handle_flow_back_part)
flow_issue = flow_sub.add_parser("issue", help="记录一般问题(不阻塞继续)")
flow_issue.add_argument("description", help="问题描述")
flow_issue.set_defaults(func=handle_flow_issue)
flow_error = flow_sub.add_parser("error", help="记录严重错误(需要用户关注)")
flow_error.add_argument("description", help="错误描述")
flow_error.set_defaults(func=handle_flow_error)
flow_parser.set_defaults(func=handle_flow_help)
parser.add_argument("--version", action="version", version="aide dev") parser.add_argument("--version", action="version", version="aide dev")
return parser return parser
@@ -190,6 +228,60 @@ def handle_config_set(args: argparse.Namespace) -> bool:
return True return True
def handle_flow_help(args: argparse.Namespace) -> bool:
output.info("用法: aide flow <start|next-step|back-step|next-part|back-part|issue|error> ...")
return True
def handle_flow_start(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.start(args.phase, args.summary)
def handle_flow_next_step(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.next_step(args.summary)
def handle_flow_back_step(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.back_step(args.reason)
def handle_flow_next_part(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.next_part(args.phase, args.summary)
def handle_flow_back_part(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.back_part(args.phase, args.reason)
def handle_flow_issue(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.issue(args.description)
def handle_flow_error(args: argparse.Namespace) -> bool:
root = Path.cwd()
cfg = ConfigManager(root)
tracker = FlowTracker(root, cfg)
return tracker.error(args.description)
def _parse_value(raw: str) -> Any: def _parse_value(raw: str) -> Any:
lowered = raw.lower() lowered = raw.lower()
if lowered in {"true", "false"}: if lowered in {"true", "false"}: