Files
agent-aide/aide-program/aide/flow/storage.py
2025-12-19 02:30:45 +08:00

191 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""状态文件读写:锁、原子写入、归档。"""
from __future__ import annotations
import json
import os
import secrets
import time
from contextlib import contextmanager
from pathlib import Path
from aide.flow.errors import FlowError
from aide.flow.types import FlowStatus
from aide.flow.utils import now_iso, now_task_id
class FlowStorage:
def __init__(self, root: Path):
self.root = root
self.aide_dir = self.root / ".aide"
self.status_path = self.aide_dir / "flow-status.json"
self.lock_path = self.aide_dir / "flow-status.lock"
self.tmp_path = self.aide_dir / "flow-status.json.tmp"
self.logs_dir = self.aide_dir / "logs"
self.back_confirm_path = self.aide_dir / "back-confirm-state.json"
def ensure_ready(self) -> None:
if not self.aide_dir.exists():
raise FlowError("未找到 .aide 目录请先运行aide init")
self.logs_dir.mkdir(parents=True, exist_ok=True)
@contextmanager
def lock(self, timeout_seconds: float = 3.0, poll_seconds: float = 0.2):
self.ensure_ready()
start = time.time()
fd: int | None = None
while True:
try:
fd = os.open(str(self.lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode("utf-8"))
break
except FileExistsError:
if time.time() - start >= timeout_seconds:
raise FlowError("状态文件被占用,请稍后重试或删除 .aide/flow-status.lock")
time.sleep(poll_seconds)
try:
yield
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
try:
self.lock_path.unlink(missing_ok=True)
except Exception:
pass
def load_status(self) -> FlowStatus | None:
if not self.status_path.exists():
return None
try:
raw = self.status_path.read_text(encoding="utf-8")
data = json.loads(raw)
if not isinstance(data, dict):
raise ValueError("状态文件顶层必须为对象")
return FlowStatus.from_dict(data)
except Exception as exc:
raise FlowError(f"状态文件解析失败: {exc}")
def save_status(self, status: FlowStatus) -> None:
payload = json.dumps(status.to_dict(), ensure_ascii=False, indent=2) + "\n"
try:
self.tmp_path.write_text(payload, encoding="utf-8")
os.replace(self.tmp_path, self.status_path)
except Exception as exc:
raise FlowError(f"写入状态文件失败: {exc}")
def archive_existing_status(self) -> None:
if not self.status_path.exists():
return
suffix = now_task_id()
try:
current = self.load_status()
suffix = current.task_id
except FlowError:
pass
target = self.logs_dir / f"flow-status.{suffix}.json"
try:
os.replace(self.status_path, target)
except Exception as exc:
raise FlowError(f"归档旧状态失败: {exc}")
def list_all_tasks(self) -> list[dict]:
"""列出所有任务(当前 + 归档),返回按时间倒序排列的任务摘要列表。"""
tasks = []
# 当前任务
current = self.load_status()
if current is not None:
tasks.append({
"task_id": current.task_id,
"phase": current.current_phase,
"started_at": current.started_at,
"summary": current.history[0].summary if current.history else "",
"is_current": True,
})
# 归档任务
if self.logs_dir.exists():
for f in self.logs_dir.glob("flow-status.*.json"):
try:
raw = f.read_text(encoding="utf-8")
data = json.loads(raw)
status = FlowStatus.from_dict(data)
tasks.append({
"task_id": status.task_id,
"phase": status.current_phase,
"started_at": status.started_at,
"summary": status.history[0].summary if status.history else "",
"is_current": False,
})
except Exception:
continue
# 按 task_id 倒序task_id 是时间戳格式)
tasks.sort(key=lambda x: x["task_id"], reverse=True)
return tasks
def load_task_by_id(self, task_id: str) -> FlowStatus | None:
"""根据 task_id 加载任务状态(当前或归档)。"""
# 先检查当前任务
current = self.load_status()
if current is not None and current.task_id == task_id:
return current
# 检查归档
archive_path = self.logs_dir / f"flow-status.{task_id}.json"
if archive_path.exists():
try:
raw = archive_path.read_text(encoding="utf-8")
data = json.loads(raw)
return FlowStatus.from_dict(data)
except Exception as exc:
raise FlowError(f"读取归档任务失败: {exc}")
return None
# === Back-confirm 状态管理 ===
def has_pending_back_confirm(self) -> bool:
"""检查是否存在待确认的 back 请求。"""
return self.back_confirm_path.exists()
def load_back_confirm_state(self) -> dict | None:
"""加载 back-confirm 状态。"""
if not self.back_confirm_path.exists():
return None
try:
raw = self.back_confirm_path.read_text(encoding="utf-8")
data = json.loads(raw)
if not isinstance(data, dict):
raise ValueError("back-confirm 状态文件格式错误")
return data
except Exception as exc:
raise FlowError(f"读取 back-confirm 状态失败: {exc}")
def save_back_confirm_state(self, target_part: str, reason: str) -> str:
"""保存 back-confirm 状态,返回生成的 key。"""
key = secrets.token_hex(6) # 12 字符的随机 key
data = {
"pending_key": key,
"target_part": target_part,
"reason": reason,
"created_at": now_iso(),
}
try:
payload = json.dumps(data, ensure_ascii=False, indent=2) + "\n"
self.back_confirm_path.write_text(payload, encoding="utf-8")
except Exception as exc:
raise FlowError(f"保存 back-confirm 状态失败: {exc}")
return key
def clear_back_confirm_state(self) -> None:
"""清除 back-confirm 状态文件。"""
try:
self.back_confirm_path.unlink(missing_ok=True)
except Exception as exc:
raise FlowError(f"清除 back-confirm 状态失败: {exc}")