[fix] reduce duplicate fastapi telemetry
This commit is contained in:
parent
0829ad6c12
commit
9ea3414bc0
6 changed files with 70 additions and 85 deletions
|
|
@ -56,7 +56,7 @@ def build_container(
|
|||
observability = build_observability(app_config)
|
||||
|
||||
user_repository = InMemoryUserRepository(
|
||||
[User(id='123', email='aza@gglamer.ru', name='gglamer')]
|
||||
observability.tracer, [User(id='123', email='aza@gglamer.ru', name='gglamer')]
|
||||
)
|
||||
repositories = AppRepositories(user=user_repository)
|
||||
usecases = AppUsecases(
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
from collections.abc import Callable
|
||||
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
|
||||
from adapter.config.loader import load_config
|
||||
from adapter.config.model import AppConfig
|
||||
from adapter.http.fastapi.dependencies import APP_CONFIG_STATE
|
||||
from adapter.http.fastapi.lifespan import app_lifespan
|
||||
from adapter.di.container import AppContainer, build_container
|
||||
from adapter.http.fastapi.dependencies import APP_CONFIG_STATE, APP_CONTAINER_STATE
|
||||
from adapter.http.fastapi.middleware import register_middleware
|
||||
from adapter.http.fastapi.routers.v1.router import router as v1_router
|
||||
from fastapi import FastAPI
|
||||
|
|
@ -11,8 +15,51 @@ API_V1_PREFIX = '/api/v1'
|
|||
|
||||
def create_app(config: AppConfig | None = None) -> FastAPI:
|
||||
app_config = load_config() if config is None else config
|
||||
app = FastAPI(title=app_config.app.name, lifespan=app_lifespan)
|
||||
container = build_container(config=app_config)
|
||||
app: FastAPI | None = None
|
||||
|
||||
try:
|
||||
app = FastAPI(title=app_config.app.name)
|
||||
setattr(app.state, APP_CONFIG_STATE, app_config)
|
||||
setattr(app.state, APP_CONTAINER_STATE, container)
|
||||
app.add_event_handler('shutdown', _build_shutdown_handler(app, container))
|
||||
register_middleware(app, app_config)
|
||||
app.include_router(v1_router, prefix=API_V1_PREFIX)
|
||||
|
||||
FastAPIInstrumentor.instrument_app(
|
||||
app,
|
||||
tracer_provider=container.observability.tracer_provider,
|
||||
meter_provider=container.observability.meter_provider,
|
||||
exclude_spans=['send', 'receive'],
|
||||
)
|
||||
|
||||
return app
|
||||
except Exception:
|
||||
try:
|
||||
if app is not None:
|
||||
_uninstrument_app(app)
|
||||
finally:
|
||||
container.shutdown()
|
||||
raise
|
||||
|
||||
|
||||
def _build_shutdown_handler(
|
||||
app: FastAPI,
|
||||
container: AppContainer,
|
||||
) -> Callable[[], None]:
|
||||
def shutdown() -> None:
|
||||
try:
|
||||
_uninstrument_app(app)
|
||||
finally:
|
||||
container.shutdown()
|
||||
|
||||
return shutdown
|
||||
|
||||
|
||||
def _uninstrument_app(app: FastAPI) -> None:
|
||||
if _is_instrumented(app):
|
||||
FastAPIInstrumentor.uninstrument_app(app)
|
||||
|
||||
|
||||
def _is_instrumented(app: FastAPI) -> bool:
|
||||
return bool(getattr(app, '_is_instrumented_by_opentelemetry', False))
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
|
||||
from adapter.di.container import build_container
|
||||
from adapter.http.fastapi.dependencies import APP_CONFIG_STATE, APP_CONTAINER_STATE
|
||||
from fastapi import FastAPI
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def app_lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
config = getattr(app.state, APP_CONFIG_STATE, None)
|
||||
if config is None:
|
||||
raise RuntimeError('config unavailable')
|
||||
|
||||
container = build_container(config=config)
|
||||
instrumented = False
|
||||
setattr(app.state, APP_CONTAINER_STATE, container)
|
||||
|
||||
try:
|
||||
tracer_provider = container.observability.tracer_provider
|
||||
if tracer_provider is not None:
|
||||
FastAPIInstrumentor.instrument_app(
|
||||
app,
|
||||
tracer_provider=tracer_provider,
|
||||
)
|
||||
instrumented = True
|
||||
yield
|
||||
finally:
|
||||
if instrumented:
|
||||
FastAPIInstrumentor.uninstrument_app(app)
|
||||
container.shutdown()
|
||||
|
|
@ -4,9 +4,6 @@ from adapter.config.model import AppConfig
|
|||
from adapter.http.fastapi.dependencies import get_container
|
||||
from fastapi import FastAPI, Request, Response
|
||||
|
||||
REQUEST_COUNT = 'http.server.request.count'
|
||||
REQUEST_DURATION = 'http.server.request.duration'
|
||||
|
||||
|
||||
def register_middleware(app: FastAPI, config: AppConfig) -> None:
|
||||
@app.middleware('http')
|
||||
|
|
@ -24,44 +21,13 @@ def register_middleware(app: FastAPI, config: AppConfig) -> None:
|
|||
finally:
|
||||
duration_ms = (perf_counter() - start) * 1000
|
||||
container = get_container(request)
|
||||
container.observability.logger.info(
|
||||
'http_request',
|
||||
attrs={
|
||||
attrs: dict[str, str | int | float | bool] = {
|
||||
'http.method': request.method,
|
||||
'http.path': request.url.path,
|
||||
'http.status_code': status_code,
|
||||
'http.duration_ms': duration_ms,
|
||||
},
|
||||
)
|
||||
|
||||
if not config.metrics.enabled:
|
||||
return
|
||||
|
||||
@app.middleware('http')
|
||||
async def metrics_middleware(
|
||||
request: Request,
|
||||
call_next,
|
||||
) -> Response:
|
||||
start = perf_counter()
|
||||
status_code = 500
|
||||
|
||||
try:
|
||||
response = await call_next(request)
|
||||
status_code = response.status_code
|
||||
return response
|
||||
finally:
|
||||
duration_ms = (perf_counter() - start) * 1000
|
||||
container = get_container(request)
|
||||
route = request.scope.get('route')
|
||||
path = getattr(route, 'path', None)
|
||||
attrs: dict[str, str | int] = {
|
||||
'http.method': request.method,
|
||||
'http.path': path if isinstance(path, str) and path else 'unmatched',
|
||||
'http.status_code': status_code,
|
||||
}
|
||||
container.observability.metrics.increment(REQUEST_COUNT, attrs=attrs)
|
||||
container.observability.metrics.record(
|
||||
REQUEST_DURATION,
|
||||
duration_ms,
|
||||
container.observability.logger.info(
|
||||
'http_request',
|
||||
attrs=attrs,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,14 +1,20 @@
|
|||
from collections.abc import Iterable
|
||||
|
||||
from domain.user import User
|
||||
from usecase.interface import UserRepository
|
||||
from usecase.interface import Tracer, UserRepository
|
||||
|
||||
|
||||
class InMemoryUserRepository(UserRepository):
|
||||
def __init__(self, users: Iterable[User] | None = None) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
tracer: Tracer,
|
||||
users: Iterable[User] | None = None,
|
||||
) -> None:
|
||||
self._users = {user.id: user for user in users or ()}
|
||||
self._tracer = tracer
|
||||
|
||||
def get(self, user_id: str) -> User | None:
|
||||
with self._tracer.start_span('repository.user', attrs={'user.id': user_id}):
|
||||
return self._users.get(user_id)
|
||||
|
||||
def get_by_email(self, email: str) -> User | None:
|
||||
|
|
|
|||
|
|
@ -37,5 +37,4 @@ class GetUser:
|
|||
raise error
|
||||
|
||||
span.set_attribute('user.email', user.email)
|
||||
self._logger.info('user_loaded', attrs={'user_id': user.id})
|
||||
return user
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue