сохранение чекпоинтов в sqlite

This commit is contained in:
Егор Кандрушин 2026-04-28 15:07:04 +03:00
parent b1e10f25b1
commit 42e6723abb
7 changed files with 82 additions and 3 deletions

View file

@ -6,8 +6,10 @@ from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.state import CompiledStateGraph
from composio import Composio
from composio_langchain import LangchainProvider
from langgraph.checkpoint.sqlite import SqliteSaver
from src.agent.tools import send_file, execute_shell
from src.agent.checkpointer import get_active_checkpointer
class Agent(CompiledStateGraph):
@ -38,14 +40,16 @@ def create_agent() -> Agent:
}
)
checkpointer = get_active_checkpointer()
# noinspection PyTypeChecker
# create_deep_agent возвращает CompiledStateGraph, но ниже мы его дополняем так, чтобы он соответствовал сигнатуре Agent
agent: Agent = create_deep_agent(
model=model,
system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.",
checkpointer=MemorySaver(),
tools=tools + [send_file, execute_shell],
backend=backend,
checkpointer=checkpointer,
permissions=[
FilesystemPermission(
operations=["read", "write"],

27
src/agent/checkpointer.py Normal file
View file

@ -0,0 +1,27 @@
from contextlib import asynccontextmanager
import os
from typing import AsyncIterable
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from pathlib import Path
_instance: AsyncSqliteSaver | None = None
def get_active_checkpointer() -> AsyncSqliteSaver:
if not _instance:
raise RuntimeError("Checkpointer not initialized")
return _instance
@asynccontextmanager
async def create_checkpointer() -> AsyncIterable[AsyncSqliteSaver]:
global _instance
internal_data_dir = os.environ["INTERNAL_DATA_DIR"]
filepath = Path(internal_data_dir) / "checkpoint.sqlite"
async with AsyncSqliteSaver.from_conn_string(filepath) as saver:
_instance = saver
yield saver
_instance = None

View file

@ -4,12 +4,14 @@ from fastapi import FastAPI
from src.api.external import router as ws_router
from src.agent import AgentService
from src.agent.checkpointer import create_checkpointer
@asynccontextmanager
async def lifespan(app: FastAPI):
AgentService() # инициализируем синглтон
yield
async with create_checkpointer():
AgentService() # инициализируем синглтон
yield
app = FastAPI(lifespan=lifespan)