[aide] impl: 子计划1 Git分支管理核心功能实现完成

This commit is contained in:
2025-12-17 04:08:17 +08:00
parent cf52da4083
commit 79a22bc137
6 changed files with 489 additions and 6 deletions

View File

@@ -1,7 +1,7 @@
{
"task_id": "2025-12-17T03-13-17",
"current_phase": "impl",
"current_step": 8,
"current_step": 9,
"started_at": "2025-12-17T03:13:17+08:00",
"history": [
{
@@ -65,7 +65,15 @@
"action": "next-part",
"phase": "impl",
"step": 8,
"summary": "流程设计完成开始实现子计划1Git分支管理核心功能"
"summary": "流程设计完成开始实现子计划1Git分支管理核心功能",
"git_commit": "cf52da408315f0beb7e1d64cc37e48e35f81d8a1"
},
{
"timestamp": "2025-12-17T04:08:17+08:00",
"action": "next-step",
"phase": "impl",
"step": 9,
"summary": "子计划1 Git分支管理核心功能实现完成"
}
]
}

View File

@@ -1 +1 @@
26012
26989

View File

@@ -0,0 +1,368 @@
"""BranchManager管理 aide flow 任务分支编号和概况文档。"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.utils import now_iso
@dataclass
class BranchInfo:
"""分支信息记录"""
number: int
branch_name: str
source_branch: str
start_commit: str
end_commit: str | None
task_id: str
task_summary: str
started_at: str
finished_at: str | None
status: str # "active", "finished", "merged-to-temp"
temp_branch: str | None = None
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {
"number": self.number,
"branch_name": self.branch_name,
"source_branch": self.source_branch,
"start_commit": self.start_commit,
"task_id": self.task_id,
"task_summary": self.task_summary,
"started_at": self.started_at,
"status": self.status,
}
if self.end_commit is not None:
data["end_commit"] = self.end_commit
if self.finished_at is not None:
data["finished_at"] = self.finished_at
if self.temp_branch is not None:
data["temp_branch"] = self.temp_branch
return data
@staticmethod
def from_dict(data: dict[str, Any]) -> "BranchInfo":
return BranchInfo(
number=data["number"],
branch_name=data["branch_name"],
source_branch=data["source_branch"],
start_commit=data["start_commit"],
end_commit=data.get("end_commit"),
task_id=data["task_id"],
task_summary=data["task_summary"],
started_at=data["started_at"],
finished_at=data.get("finished_at"),
status=data.get("status", "active"),
temp_branch=data.get("temp_branch"),
)
@dataclass
class BranchesData:
"""分支概况数据"""
next_number: int
branches: list[BranchInfo]
def to_dict(self) -> dict[str, Any]:
return {
"next_number": self.next_number,
"branches": [b.to_dict() for b in self.branches],
}
@staticmethod
def from_dict(data: dict[str, Any]) -> "BranchesData":
next_number = data.get("next_number", 1)
branches_raw = data.get("branches", [])
branches = [BranchInfo.from_dict(b) for b in branches_raw]
return BranchesData(next_number=next_number, branches=branches)
class BranchManager:
"""管理 aide flow 任务分支"""
def __init__(self, root: Path, git: GitIntegration):
self.root = root
self.git = git
self.aide_dir = root / ".aide"
self.branches_json = self.aide_dir / "branches.json"
self.branches_md = self.aide_dir / "branches.md"
self._data: BranchesData | None = None
self._current_branch_info: BranchInfo | None = None
def load_branches(self) -> BranchesData:
"""加载分支概况"""
if self._data is not None:
return self._data
if not self.branches_json.exists():
self._data = BranchesData(next_number=1, branches=[])
return self._data
try:
content = self.branches_json.read_text(encoding="utf-8")
data = json.loads(content)
self._data = BranchesData.from_dict(data)
return self._data
except (json.JSONDecodeError, KeyError, TypeError) as e:
raise FlowError(f"读取分支概况失败: {e}") from e
def save_branches(self) -> None:
"""保存分支概况(同时生成 JSON 和 MD"""
if self._data is None:
return
self.aide_dir.mkdir(parents=True, exist_ok=True)
# 保存 JSON
json_content = json.dumps(
self._data.to_dict(),
ensure_ascii=False,
indent=2,
)
self.branches_json.write_text(json_content + "\n", encoding="utf-8")
# 生成并保存 MD
md_content = self._generate_markdown()
self.branches_md.write_text(md_content, encoding="utf-8")
def _generate_markdown(self) -> str:
"""生成 Markdown 格式的分支概况"""
if self._data is None:
return "# Git 分支概况\n\n暂无分支记录。\n"
lines = ["# Git 分支概况\n"]
if not self._data.branches:
lines.append("暂无分支记录。\n")
return "\n".join(lines)
for branch in reversed(self._data.branches):
lines.append(f"## {branch.branch_name}\n")
lines.append(f"- **任务**: {branch.task_summary}")
lines.append(f"- **任务ID**: {branch.task_id}")
lines.append(f"- **源分支**: {branch.source_branch}")
lines.append(f"- **起始提交**: {branch.start_commit[:7]}")
if branch.end_commit:
lines.append(f"- **结束提交**: {branch.end_commit[:7]}")
lines.append(f"- **状态**: {branch.status}")
time_str = branch.started_at[:16].replace("T", " ")
if branch.finished_at:
time_str += f" ~ {branch.finished_at[11:16]}"
lines.append(f"- **时间**: {time_str}")
if branch.temp_branch:
lines.append(f"- **临时分支**: {branch.temp_branch}")
lines.append("")
return "\n".join(lines)
def get_next_branch_number(self) -> int:
"""获取下一个分支编号"""
data = self.load_branches()
return data.next_number
def create_task_branch(
self,
task_id: str,
task_summary: str,
) -> str:
"""创建任务分支并记录信息
返回创建的分支名称
"""
self.git.ensure_repo()
data = self.load_branches()
# 确保 git 状态干净
if not self.git.is_clean():
self.git.add_all()
self.git.commit("[aide] 保存未提交的变更")
# 确保有提交历史
if not self.git.has_commits():
gitkeep = self.root / ".gitkeep"
if not gitkeep.exists():
gitkeep.touch()
self.git.add_all()
self.git.commit("[aide] 初始提交")
# 记录起始信息
source_branch = self.git.get_current_branch()
start_commit = self.git.rev_parse_head()
# 创建分支名
branch_number = data.next_number
branch_name = f"aide/{branch_number:03d}"
# 创建并切换到任务分支
self.git.checkout_new_branch(branch_name)
# 记录分支信息
branch_info = BranchInfo(
number=branch_number,
branch_name=branch_name,
source_branch=source_branch,
start_commit=start_commit,
end_commit=None,
task_id=task_id,
task_summary=task_summary,
started_at=now_iso(),
finished_at=None,
status="active",
)
# 更新数据
data.branches.append(branch_info)
data.next_number = branch_number + 1
self._data = data
self._current_branch_info = branch_info
# 保存
self.save_branches()
return branch_name
def get_active_branch_info(self) -> BranchInfo | None:
"""获取当前活跃的分支信息"""
if self._current_branch_info is not None:
return self._current_branch_info
data = self.load_branches()
current_branch = self.git.get_current_branch()
for branch in data.branches:
if branch.branch_name == current_branch and branch.status == "active":
self._current_branch_info = branch
return branch
return None
def record_branch_finish(
self,
status: str = "finished",
end_commit: str | None = None,
temp_branch: str | None = None,
) -> None:
"""记录分支结束信息"""
data = self.load_branches()
branch_info = self.get_active_branch_info()
if branch_info is None:
return
# 更新分支信息
for i, branch in enumerate(data.branches):
if branch.number == branch_info.number:
data.branches[i] = BranchInfo(
number=branch.number,
branch_name=branch.branch_name,
source_branch=branch.source_branch,
start_commit=branch.start_commit,
end_commit=end_commit or self.git.rev_parse_head(),
task_id=branch.task_id,
task_summary=branch.task_summary,
started_at=branch.started_at,
finished_at=now_iso(),
status=status,
temp_branch=temp_branch,
)
break
self._data = data
self._current_branch_info = None
self.save_branches()
def finish_branch_merge(self, task_summary: str) -> tuple[bool, str]:
"""执行分支合并逻辑
返回 (是否成功, 消息)
"""
branch_info = self.get_active_branch_info()
if branch_info is None:
return True, "未找到活跃的任务分支,跳过合并"
source_branch = branch_info.source_branch
start_commit = branch_info.start_commit
task_branch = branch_info.branch_name
# 检查源分支是否有新提交
if self.git.has_commits_since(start_commit, source_branch):
# 源分支有新提交,使用临时分支策略
return self._merge_with_temp_branch(
branch_info=branch_info,
task_summary=task_summary,
)
else:
# 正常合并流程
return self._merge_normal(
branch_info=branch_info,
task_summary=task_summary,
)
def _merge_normal(
self,
branch_info: BranchInfo,
task_summary: str,
) -> tuple[bool, str]:
"""正常合并流程:软重置 + 压缩提交"""
source_branch = branch_info.source_branch
start_commit = branch_info.start_commit
# 切回源分支
self.git.checkout(source_branch)
# 软重置到起始提交
self.git.reset_soft(start_commit)
# 创建压缩提交
self.git.add_all()
commit_msg = f"[aide] 任务: {task_summary}"
end_commit = self.git.commit(commit_msg)
# 记录完成
self.record_branch_finish(
status="finished",
end_commit=end_commit,
)
return True, f"任务分支已合并到 {source_branch}"
def _merge_with_temp_branch(
self,
branch_info: BranchInfo,
task_summary: str,
) -> tuple[bool, str]:
"""临时分支合并策略:源分支有新提交时使用"""
start_commit = branch_info.start_commit
task_branch = branch_info.branch_name
temp_branch = f"{task_branch}-merge"
# 从起始提交检出临时分支
self.git.checkout_new_branch(temp_branch, start_commit)
# 在临时分支执行 squash 合并
self.git.merge_squash(task_branch)
# 创建提交
self.git.add_all()
commit_msg = f"[aide] 任务压缩提交: {task_summary}"
end_commit = self.git.commit(commit_msg)
# 记录完成(保留任务分支和临时分支)
self.record_branch_finish(
status="merged-to-temp",
end_commit=end_commit,
temp_branch=temp_branch,
)
return False, (
f"⚠ 源分支 {branch_info.source_branch} 有新提交\n"
f"已在临时分支 {temp_branch} 完成合并\n"
f"请手动处理后续操作"
)

View File

@@ -61,6 +61,71 @@ class GitIntegration:
files = [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]
return path in files
# === 分支管理新增方法 ===
def get_current_branch(self) -> str:
"""获取当前分支名"""
result = self._run(["rev-parse", "--abbrev-ref", "HEAD"], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("获取当前分支失败", result))
return (result.stdout or "").strip()
def is_clean(self) -> bool:
"""检查工作目录是否干净(无未提交的变更)"""
result = self._run(["status", "--porcelain"], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error("检查 git 状态失败", result))
return not (result.stdout or "").strip()
def has_commits(self) -> bool:
"""检查是否有提交历史"""
result = self._run(["rev-parse", "HEAD"], check=False)
return result.returncode == 0
def create_branch(self, name: str, start_point: str | None = None) -> None:
"""创建新分支"""
args = ["branch", name]
if start_point:
args.append(start_point)
result = self._run(args, check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"创建分支 {name} 失败", result))
def checkout(self, branch: str) -> None:
"""切换到指定分支"""
result = self._run(["checkout", branch], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"切换到分支 {branch} 失败", result))
def checkout_new_branch(self, name: str, start_point: str | None = None) -> None:
"""创建并切换到新分支"""
args = ["checkout", "-b", name]
if start_point:
args.append(start_point)
result = self._run(args, check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"创建并切换到分支 {name} 失败", result))
def has_commits_since(self, commit: str, branch: str) -> bool:
"""检查指定分支自某提交后是否有新提交"""
result = self._run(["rev-list", f"{commit}..{branch}", "--count"], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"检查分支 {branch} 新提交失败", result))
count = int((result.stdout or "0").strip())
return count > 0
def reset_soft(self, commit: str) -> None:
"""软重置到指定提交"""
result = self._run(["reset", "--soft", commit], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"软重置到 {commit} 失败", result))
def merge_squash(self, branch: str) -> None:
"""squash 合并指定分支"""
result = self._run(["merge", "--squash", branch], check=False)
if result.returncode != 0:
raise FlowError(_format_git_error(f"squash 合并分支 {branch} 失败", result))
def _run(self, args: list[str], check: bool) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["git", *args],

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from aide.core import output
from aide.core.config import ConfigManager
from aide.flow.branch import BranchManager
from aide.flow.errors import FlowError
from aide.flow.git import GitIntegration
from aide.flow.hooks import run_post_commit_hooks, run_pre_commit_hooks
@@ -23,6 +24,7 @@ class FlowTracker:
self.cfg = cfg
self.storage = FlowStorage(root)
self.git = GitIntegration(root)
self.branch_mgr = BranchManager(root, self.git)
def start(self, phase: str, summary: str) -> bool:
return self._run(action="start", to_phase=phase, text=summary)
@@ -61,12 +63,24 @@ class FlowTracker:
assert to_phase is not None
validator.validate_start(to_phase)
self.storage.archive_existing_status()
# 创建任务分支
task_id = now_task_id()
task_branch = self.branch_mgr.create_task_branch(
task_id=task_id,
task_summary=normalized_text,
)
branch_info = self.branch_mgr.get_active_branch_info()
status = FlowStatus(
task_id=now_task_id(),
task_id=task_id,
current_phase=to_phase,
current_step=0,
started_at=now_iso(),
history=[],
source_branch=branch_info.source_branch if branch_info else None,
start_commit=branch_info.start_commit if branch_info else None,
task_branch=task_branch,
)
updated, commit_msg = self._apply_action(
status=status,
@@ -80,7 +94,7 @@ class FlowTracker:
self.storage.save_status(updated)
final_status = self._do_git_commit(updated, commit_msg)
self.storage.save_status(final_status)
output.ok(f"任务开始: {to_phase}")
output.ok(f"任务开始: {to_phase} (分支: {task_branch})")
run_post_commit_hooks(to_phase=to_phase, action=action)
return True
@@ -94,6 +108,14 @@ class FlowTracker:
if action == "next-part":
assert to_phase is not None
validator.validate_next_part(current_phase, to_phase)
# 如果进入 finish 环节,执行分支合并
if to_phase == "finish":
success, merge_msg = self.branch_mgr.finish_branch_merge(
task_summary=normalized_text,
)
if not success:
output.warn(merge_msg)
elif action == "back-part":
assert to_phase is not None
validator.validate_back_part(current_phase, to_phase)
@@ -181,6 +203,9 @@ class FlowTracker:
current_step=next_step,
started_at=status.started_at,
history=history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return updated_status, message
@@ -207,6 +232,9 @@ class FlowTracker:
current_step=status.current_step,
started_at=status.started_at,
history=updated_history,
source_branch=status.source_branch,
start_commit=status.start_commit,
task_branch=status.task_branch,
)
return status

View File

@@ -54,15 +54,26 @@ class FlowStatus:
current_step: int
started_at: str
history: list[HistoryEntry]
# 分支管理相关字段
source_branch: str | None = None
start_commit: str | None = None
task_branch: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
data: dict[str, Any] = {
"task_id": self.task_id,
"current_phase": self.current_phase,
"current_step": self.current_step,
"started_at": self.started_at,
"history": [h.to_dict() for h in self.history],
}
if self.source_branch is not None:
data["source_branch"] = self.source_branch
if self.start_commit is not None:
data["start_commit"] = self.start_commit
if self.task_branch is not None:
data["task_branch"] = self.task_branch
return data
@staticmethod
def from_dict(data: dict[str, Any]) -> "FlowStatus":
@@ -84,6 +95,9 @@ class FlowStatus:
current_step=current_step,
started_at=started_at,
history=history,
source_branch=data.get("source_branch"),
start_commit=data.get("start_commit"),
task_branch=data.get("task_branch"),
)