использование самописного тула вместо Backend

This commit is contained in:
Егор Кандрушин 2026-04-28 13:47:30 +03:00
parent 7907f57971
commit b1e10f25b1
6 changed files with 87 additions and 132 deletions

View file

@ -1,3 +0,0 @@
from src.agent.backends.isolated_shell import IsolatedShellBackend
__all__ = ["IsolatedShellBackend"]

View file

@ -1,124 +0,0 @@
import os
import pwd
import subprocess
from typing import Any
import uuid
from pathlib import Path
from deepagents.backends.local_shell import DEFAULT_EXECUTE_TIMEOUT
from deepagents.backends.protocol import SandboxBackendProtocol
class IsolatedShellBackend(SandboxBackendProtocol):
"""LocalShellBackend с изоляцией shell-команд через отдельного пользователя."""
def __init__(
self,
user: str,
root_dir: str,
timeout: int = DEFAULT_EXECUTE_TIMEOUT,
max_output_bytes: int = 100_000,
env: dict[str, str] | None = None,
inherit_env: bool = False,
):
self._user = user
self._uid = pwd.getpwnam(user).pw_uid # type: ignore[attr-defined]
self._gid = pwd.getpwnam(user).pw_gid # type: ignore[attr-defined]
if timeout <= 0:
msg = f"timeout must be positive, got {timeout}"
raise ValueError(msg)
# Store execution parameters
self._default_timeout = timeout
self._max_output_bytes = max_output_bytes
self.cwd = Path(root_dir).resolve() if root_dir else Path.cwd()
# Build environment based on inherit_env setting
if inherit_env:
self._env = os.environ.copy()
if env is not None:
self._env.update(env)
else:
self._env = env if env is not None else {}
# Generate unique sandbox ID
self._sandbox_id = f"local-{uuid.uuid4().hex[:8]}"
def execute(
self,
command: str,
*,
timeout: int | None = None,
) -> Any:
if not command or not isinstance(command, str):
return type(self)._error_response("Command must be a non-empty string.")
effective_timeout = timeout if timeout is not None else self._default_timeout
if effective_timeout <= 0:
return type(self)._error_response(
f"timeout must be positive, got {effective_timeout}"
)
proc: subprocess.Popen | None = None
try:
print(f"Running shell: {command}")
proc = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=str(self.cwd),
env=self._env,
preexec_fn=lambda: (
os.setgid(self._gid) or os.setuid(self._uid) # type: ignore[attr-defined]
),
)
stdout, stderr = proc.communicate(timeout=effective_timeout)
output_parts = []
if stdout:
output_parts.append(stdout)
if stderr:
stderr_lines = stderr.strip().split("\n")
output_parts.extend(f"[stderr] {line}" for line in stderr_lines)
output = "\n".join(output_parts) if output_parts else "<no output>"
truncated = False
if len(output) > self._max_output_bytes:
output = output[: self._max_output_bytes]
output += f"\n\n... Output truncated at {self._max_output_bytes} bytes."
truncated = True
if proc.returncode != 0:
output = f"{output.rstrip()}\n\nExit code: {proc.returncode}"
result = self._make_response(output, proc.returncode, truncated)
print(result)
return result
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
msg = f"Error: Command timed out after {effective_timeout} seconds."
return self._make_response(msg, 124, False)
except Exception as e:
return self._make_response(
f"Error executing command ({type(e).__name__}): {e}", 1, False
)
@staticmethod
def _error_response(message: str):
from deepagents.backends.protocol import ExecuteResponse
return ExecuteResponse(output=message, exit_code=1, truncated=False)
@staticmethod
def _make_response(output: str, exit_code: int, truncated: bool):
from deepagents.backends.protocol import ExecuteResponse
return ExecuteResponse(output=output, exit_code=exit_code, truncated=truncated)

View file

@ -1,14 +1,13 @@
import os
from deepagents import create_deep_agent, FilesystemPermission
from deepagents.backends import CompositeBackend, FilesystemBackend
from deepagents.backends import CompositeBackend, FilesystemBackend, StateBackend
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.state import CompiledStateGraph
from composio import Composio
from composio_langchain import LangchainProvider
from src.agent.backends import IsolatedShellBackend
from src.agent.tools import send_file
from src.agent.tools import send_file, execute_shell
class Agent(CompiledStateGraph):
@ -31,9 +30,9 @@ def create_agent() -> Agent:
tools = session.tools()
workspace_dir = os.environ["WORKSPACE_DIR"]
agent_user = os.environ.get("AGENT_USER", "agent")
backend = CompositeBackend(
default=StateBackend(),
routes={
workspace_dir: FilesystemBackend(workspace_dir, virtual_mode=True),
}
@ -45,7 +44,7 @@ def create_agent() -> Agent:
model=model,
system_prompt="You are a helpful assistant. Use Composio tools to take action when needed.",
checkpointer=MemorySaver(),
tools=tools + [send_file],
tools=tools + [send_file, execute_shell],
backend=backend,
permissions=[
FilesystemPermission(

View file

@ -0,0 +1,2 @@
from src.agent.tools.send_file import send_file
from src.agent.tools.execute_shell import execute_shell

View file

@ -0,0 +1,81 @@
import os
import subprocess
from dataclasses import dataclass
from typing import Annotated
from langchain_core.tools import tool
DEFAULT_TIMEOUT = 30
DEFAULT_MAX_OUTPUT = 100_000
CWD = os.environ.get("WORKSPACE_DIR", None) or "/"
TOOL_DESCRIPTION = \
f"""
Execute shell command and return formatted output.
Your default working dir is: {CWD}
Args:
command: shell command to execute
timeout: timeout in seconds (None = use default)
Returns:
Formatted output with exit code
"""
@tool(description=TOOL_DESCRIPTION)
def execute_shell(
command: Annotated[str, "Shell command to execute"],
timeout: Annotated[
int | None, f"Timeout in seconds, default is not specified"
] = None,
) -> Annotated[str, "Command output with exit code"]:
# Validate timeout type
if timeout is not None:
if not isinstance(timeout, int):
return "Error: timeout must be an integer"
if timeout < 0:
return f"Error: timeout must be non-negative, got {timeout}"
# Validate command
if not command or not isinstance(command, str):
return "Error: command must be a non-empty string"
# Apply defaults
effective_timeout = DEFAULT_TIMEOUT if timeout is None or timeout == 0 else timeout
cwd = os.environ.get("WORKSPACE_DIR", None) or "/"
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=effective_timeout if effective_timeout > 0 else None,
cwd=cwd,
)
output = result.stdout
if result.stderr:
stderr_lines = result.stderr.strip().split("\n")
output += "\n" + "\n".join(f"[stderr] {line}" for line in stderr_lines)
# Truncate if needed
max_output = DEFAULT_MAX_OUTPUT
if len(output) > max_output:
output = output[:max_output]
output += f"\n\n... Output truncated at {max_output} bytes"
# Add exit code status
status = "succeeded" if result.returncode == 0 else "failed"
output += f"\n\n[Command {status} with exit code {result.returncode}]"
return output
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {effective_timeout} seconds"
except FileNotFoundError:
return f"Error: Command not found"
except PermissionError:
return f"Error: Permission denied"
except Exception as e:
return f"Error executing command ({type(e).__name__}): {e}"