master/adapter/http/fastapi/routers/v1/router.py
2026-04-02 13:41:41 +03:00

91 lines
2.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from adapter.di.container import AppContainer
from adapter.http.fastapi.dependencies import (
get_container,
get_create_sandbox,
get_get_user,
)
from adapter.http.fastapi.schemas import (
CreateSandboxRequest,
ErrorResponse,
HealthResponse,
SandboxSessionResponse,
UserResponse,
)
from domain.error import SandboxError, SandboxStartError, UserNotFoundError
from domain.sandbox import SandboxSession
from usecase.sandbox import CreateSandbox, CreateSandboxCommand
from usecase.user import GetUser, GetUserQuery
router = APIRouter()
@router.get(
'/health',
response_model=HealthResponse,
status_code=status.HTTP_200_OK,
)
def health(container: AppContainer = Depends(get_container)) -> HealthResponse:
return HealthResponse(
status='ok',
app=container.config.app.name,
env=container.config.app.env,
)
@router.get(
'/users/{user_id}',
response_model=UserResponse,
responses={status.HTTP_404_NOT_FOUND: {'model': ErrorResponse}},
status_code=status.HTTP_200_OK,
)
def get_user(user_id: str, usecase: GetUser = Depends(get_get_user)) -> UserResponse:
try:
user = usecase.execute(GetUserQuery(user_id=user_id))
except UserNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
return UserResponse(id=user.id, email=user.email, name=user.name)
@router.post(
'/create',
response_model=SandboxSessionResponse,
responses={
status.HTTP_503_SERVICE_UNAVAILABLE: {'model': ErrorResponse},
status.HTTP_500_INTERNAL_SERVER_ERROR: {'model': ErrorResponse},
},
status_code=status.HTTP_200_OK,
)
def create_sandbox(
request: CreateSandboxRequest,
usecase: CreateSandbox = Depends(get_create_sandbox),
) -> SandboxSessionResponse:
try:
session = usecase.execute(CreateSandboxCommand(chat_id=request.chat_id))
except SandboxStartError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
) from exc
except SandboxError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
return _to_sandbox_session_response(session)
def _to_sandbox_session_response(session: SandboxSession) -> SandboxSessionResponse:
return SandboxSessionResponse(
session_id=session.session_id,
chat_id=session.chat_id,
container_id=session.container_id,
status=session.status.value,
expires_at=session.expires_at,
)