返回 Skill 列表
extension
分类: 安全与合规需要 API Key

FastAPI Production Engineering

构建、部署和扩展生产级 FastAPI 应用,包含异步编程、类型安全设计、结构化错误处理、JWT 认证、异步 ORM、自动文档、测试和可观测性。

person作者: 1kalinhubclawhub

FastAPI Production Engineering

Complete methodology for building, deploying, and scaling production FastAPI applications. Not a tutorial — a production operating system.

Quick Health Check (/16)

Score 2 points each. Total < 8 = critical work needed.

| Signal | Healthy | Unhealthy | |--------|---------|-----------| | Type safety | Pydantic v2 models everywhere | dict returns, no validation | | Error handling | Structured error hierarchy | Bare HTTPException strings | | Auth | JWT + dependency injection | Manual token parsing | | Testing | 80%+ coverage, async tests | No tests or sync-only | | Database | Async ORM, migrations | Raw SQL, no migrations | | Observability | Structured logging + tracing | print() debugging | | Deployment | Multi-stage Docker, health checks | uvicorn main:app on bare metal | | Documentation | Auto-generated, accurate OpenAPI | Default /docs untouched |

Phase 1: Project Architecture

Recommended Structure

src/
├── app/
│   ├── __init__.py
│   ├── main.py              # App factory
│   ├── config.py             # Pydantic Settings
│   ├── dependencies.py       # Shared DI
│   ├── middleware.py          # Custom middleware
│   ├── features/
│   │   ├── users/
│   │   │   ├── __init__.py
│   │   │   ├── router.py     # Endpoints
│   │   │   ├── schemas.py    # Pydantic models
│   │   │   ├── service.py    # Business logic
│   │   │   ├── repository.py # Data access
│   │   │   ├── models.py     # SQLAlchemy/SQLModel
│   │   │   ├── dependencies.py
│   │   │   └── exceptions.py
│   │   ├── auth/
│   │   ├── orders/
│   │   └── ...
│   ├── core/
│   │   ├── database.py       # Engine, session factory
│   │   ├── security.py       # JWT, hashing
│   │   ├── errors.py         # Error hierarchy
│   │   └── logging.py        # Structlog config
│   └── shared/
│       ├── pagination.py
│       ├── filters.py
│       └── responses.py
├── migrations/               # Alembic
├── tests/
│   ├── conftest.py
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── pyproject.toml
├── Dockerfile
└── docker-compose.yml

7 Architecture Rules

  1. Feature-based modules — group by domain, not by layer
  2. Router → Service → Repository — strict layering, no skipping
  3. Dependency injection everywhere — use Depends() for testability
  4. Pydantic models at boundaries — validate all input AND output
  5. No business logic in routers — routers are thin, services are thick
  6. Config via environment — Pydantic Settings with .env support
  7. Async by default — use async def for all I/O-bound operations

Framework Selection Context

# When to choose FastAPI over alternatives
fastapi_is_best_when:
  - "You need auto-generated OpenAPI docs"
  - "Team knows Python type hints"
  - "API-first (no server-rendered HTML as primary)"
  - "High concurrency with async I/O"
  - "Microservice or API gateway"

consider_alternatives:
  django: "Full-featured web app with admin, ORM, auth batteries"
  flask: "Simple app, team prefers explicit over magic"
  litestar: "Need WebSocket-heavy or more opinionated framework"
  hono_or_express: "Team prefers TypeScript"

Phase 2: Configuration & Environment

Pydantic Settings Pattern

from pydantic_settings import BaseSettings
from pydantic import SecretStr, field_validator
from functools import lru_cache

class Settings(BaseSettings):
    # App
    app_name: str = "MyAPI"
    debug: bool = False
    environment: str = "production"  # development | staging | production
    
    # Server
    host: str = "0.0.0.0"
    port: int = 8000
    workers: int = 4
    
    # Database
    database_url: SecretStr  # Required — no default
    db_pool_size: int = 20
    db_max_overflow: int = 10
    db_pool_timeout: int = 30
    
    # Auth
    jwt_secret: SecretStr  # Required
    jwt_algorithm: str = "HS256"
    jwt_expire_minutes: int = 30
    
    # Redis
    redis_url: str = "redis://localhost:6379/0"
    
    # CORS
    cors_origins: list[str] = ["http://localhost:3000"]
    
    @field_validator("environment")
    @classmethod
    def validate_environment(cls, v: str) -> str:
        allowed = {"development", "staging", "production"}
        if v not in allowed:
            raise ValueError(f"environment must be one of {allowed}")
        return v
    
    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

5 Configuration Rules

  1. Never hardcode secrets — use SecretStr for sensitive values
  2. Fail fast — required fields have no defaults; app won't start without them
  3. Validate at startup — use @field_validator for constraint checking
  4. Cache settings@lru_cache ensures single parse
  5. Type everything — no str for structured values; use enums, Literal types

Phase 3: Pydantic v2 Mastery

Schema Design Patterns

from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime
from uuid import UUID

# Base with common config
class AppSchema(BaseModel):
    model_config = ConfigDict(
        from_attributes=True,      # ORM mode
        str_strip_whitespace=True,  # Auto-strip
        validate_default=True,      # Validate defaults too
    )

# Input schemas (what the API accepts)
class UserCreate(AppSchema):
    email: str = Field(..., pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
    name: str = Field(..., min_length=1, max_length=100)
    password: str = Field(..., min_length=8, max_length=128)

class UserUpdate(AppSchema):
    name: str | None = Field(None, min_length=1, max_length=100)
    email: str | None = Field(None, pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")

# Output schemas (what the API returns)
class UserResponse(AppSchema):
    id: UUID
    email: str
    name: str
    created_at: datetime
    # Note: password is NEVER in response schema

# List response with pagination
class PaginatedResponse[T](AppSchema):
    items: list[T]
    total: int
    page: int
    page_size: int
    has_next: bool

8 Pydantic Rules

  1. Separate Create/Update/Response schemas — never reuse input as output
  2. Never expose internal fields — no passwords, internal IDs, or debug info in responses
  3. Use Field() for constraints — min/max length, regex patterns, gt/lt for numbers
  4. Enable from_attributes=True — for ORM model → schema conversion
  5. Use generics for wrappersPaginatedResponse[T], ApiResponse[T]
  6. Validate at boundaries — request body, query params, path params, headers
  7. Use computed fields@computed_field for derived values
  8. Document with examplesmodel_config = {"json_schema_extra": {"examples": [...]}}

Phase 4: Error Handling Architecture

Structured Error Hierarchy

from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.status import (
    HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED,
    HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND,
    HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY,
    HTTP_429_TOO_MANY_REQUESTS, HTTP_500_INTERNAL_SERVER_ERROR,
)

class AppError(Exception):
    """Base application error."""
    def __init__(
        self,
        message: str,
        code: str,
        status_code: int = HTTP_500_INTERNAL_SERVER_ERROR,
        details: dict | None = None,
    ):
        self.message = message
        self.code = code
        self.status_code = status_code
        self.details = details or {}
        super().__init__(message)

class NotFoundError(AppError):
    def __init__(self, resource: str, identifier: str | int):
        super().__init__(
            message=f"{resource} not found: {identifier}",
            code="NOT_FOUND",
            status_code=HTTP_404_NOT_FOUND,
            details={"resource": resource, "identifier": str(identifier)},
        )

class ConflictError(AppError):
    def __init__(self, message: str, field: str | None = None):
        super().__init__(
            message=message, code="CONFLICT",
            status_code=HTTP_409_CONFLICT,
            details={"field": field} if field else {},
        )

class AuthenticationError(AppError):
    def __init__(self, message: str = "Invalid credentials"):
        super().__init__(message=message, code="UNAUTHORIZED", status_code=HTTP_401_UNAUTHORIZED)

class AuthorizationError(AppError):
    def __init__(self, message: str = "Insufficient permissions"):
        super().__init__(message=message, code="FORBIDDEN", status_code=HTTP_403_FORBIDDEN)

class ValidationError(AppError):
    def __init__(self, message: str, errors: list[dict] | None = None):
        super().__init__(
            message=message, code="VALIDATION_ERROR",
            status_code=HTTP_422_UNPROCESSABLE_ENTITY,
            details={"errors": errors or []},
        )

class RateLimitError(AppError):
    def __init__(self, retry_after: int = 60):
        super().__init__(
            message="Rate limit exceeded", code="RATE_LIMITED",
            status_code=HTTP_429_TOO_MANY_REQUESTS,
            details={"retry_after": retry_after},
        )

# Global error handler
async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": {
                "code": exc.code,
                "message": exc.message,
                "details": exc.details,
            }
        },
    )

# Register in app factory
# app.add_exception_handler(AppError, app_error_handler)

6 Error Handling Rules

  1. Never return bare strings — always structured {"error": {"code", "message", "details"}}
  2. Use domain-specific errorsNotFoundError("User", user_id) not HTTPException(404)
  3. Global handler catches all — register AppError handler in app factory
  4. Log server errors, don't expose — 5xx returns generic message, logs full traceback
  5. Include actionable details — which field failed, what's allowed, retry-after for rate limits
  6. Never leak internals — no stack traces, SQL queries, or file paths in responses

Phase 5: Authentication & Authorization

JWT + Dependency Injection Pattern

from fastapi import Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone

security = HTTPBearer()

def create_access_token(user_id: str, roles: list[str], settings: Settings) -> str:
    expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expire_minutes)
    payload = {
        "sub": user_id,
        "roles": roles,
        "exp": expire,
        "iat": datetime.now(timezone.utc),
    }
    return jwt.encode(payload, settings.jwt_secret.get_secret_value(), algorithm=settings.jwt_algorithm)

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    settings: Settings = Depends(get_settings),
    db: AsyncSession = Depends(get_db),
) -> User:
    try:
        payload = jwt.decode(
            credentials.credentials,
            settings.jwt_secret.get_secret_value(),
            algorithms=[settings.jwt_algorithm],
        )
        user_id = payload.get("sub")
        if not user_id:
            raise AuthenticationError("Invalid token payload")
    except JWTError:
        raise AuthenticationError("Invalid or expired token")
    
    user = await db.get(User, user_id)
    if not user:
        raise AuthenticationError("User not found")
    return user

# Role-based authorization
def require_role(*roles: str):
    async def checker(user: User = Depends(get_current_user)) -> User:
        if not any(r in user.roles for r in roles):
            raise AuthorizationError(f"Requires one of: {', '.join(roles)}")
        return user
    return checker

# Usage in router
@router.get("/admin/users")
async def list_users(
    admin: User = Depends(require_role("admin", "superadmin")),
    service: UserService = Depends(get_user_service),
):
    return await service.list_all()

10-Point Security Checklist

| # | Check | Priority | |---|-------|----------| | 1 | JWT secret ≥ 256 bits, from env | P0 | | 2 | Token expiry ≤ 30 min for access, ≤ 7 days refresh | P0 | | 3 | Password hashed with bcrypt/argon2 | P0 | | 4 | CORS configured per environment | P0 | | 5 | Rate limiting on auth endpoints | P0 | | 6 | HTTPS enforced (redirect HTTP) | P0 | | 7 | Security headers (HSTS, CSP, X-Frame) | P1 | | 8 | Input validation on ALL endpoints | P1 | | 9 | SQL injection prevented (parameterized queries) | P0 | | 10 | Dependency scanning (safety/pip-audit) | P1 |

Phase 6: Database Patterns

Async SQLAlchemy + Repository Pattern

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select, func
from uuid import uuid4, UUID
from datetime import datetime, timezone

# Engine setup
engine = create_async_engine(
    settings.database_url.get_secret_value(),
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_timeout=settings.db_pool_timeout,
    pool_pre_ping=True,  # Check connection health
    echo=settings.debug,
)

SessionFactory = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with SessionFactory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Base model with common fields
class Base(DeclarativeBase):
    pass

class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
    updated_at: Mapped[datetime] = mapped_column(
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

# Repository pattern
class BaseRepository[T]:
    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model
    
    async def get_by_id(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)
    
    async def get_or_raise(self, id: UUID) -> T:
        entity = await self.get_by_id(id)
        if not entity:
            raise NotFoundError(self.model.__name__, str(id))
        return entity
    
    async def list(
        self, *, offset: int = 0, limit: int = 20, **filters
    ) -> tuple[list[T], int]:
        query = select(self.model)
        count_query = select(func.count()).select_from(self.model)
        
        for field, value in filters.items():
            if value is not None:
                query = query.where(getattr(self.model, field) == value)
                count_query = count_query.where(getattr(self.model, field) == value)
        
        total = await self.session.scalar(count_query) or 0
        result = await self.session.execute(
            query.offset(offset).limit(limit).order_by(self.model.created_at.desc())
        )
        return list(result.scalars().all()), total
    
    async def create(self, entity: T) -> T:
        self.session.add(entity)
        await self.session.flush()
        return entity
    
    async def delete(self, entity: T) -> None:
        await self.session.delete(entity)

ORM Selection Guide

| ORM | Best For | Async | Type Safety | Learning Curve | |-----|----------|-------|-------------|----------------| | SQLAlchemy 2.0 | Complex queries, enterprise | ✅ | ✅ Mapped[] | Medium | | SQLModel | Simple CRUD, Pydantic sync | ✅ | ✅ | Low | | Tortoise | Django-like feel | ✅ | Partial | Low | | Piccolo | Modern, migrations built-in | ✅ | ✅ | Low |

Recommendation: SQLAlchemy 2.0 for production. SQLModel for prototypes.

Migration Strategy (Alembic)

# Setup
alembic init migrations
# Edit alembic.ini: sqlalchemy.url = from env

# Generate migration
alembic revision --autogenerate -m "add users table"

# Apply
alembic upgrade head

# Rollback
alembic downgrade -1

Migration Rules:

  1. Always review autogenerated migrations before applying
  2. Never edit applied migrations — create new ones
  3. Test migrations in staging before production
  4. Include downgrade() for every upgrade()
  5. Use batch_alter_table for SQLite compatibility

Phase 7: Testing Strategy

Test Pyramid

| Level | Coverage Target | Tools | Focus | |-------|----------------|-------|-------| | Unit | 80%+ | pytest, unittest.mock | Service logic, validators | | Integration | Key paths | pytest-asyncio, testcontainers | DB queries, external APIs | | E2E | Critical flows | httpx.AsyncClient | Full request→response | | Contract | API boundaries | schemathesis | OpenAPI compliance |

Test Patterns

import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app

@pytest.fixture
async def app():
    app = create_app()
    yield app

@pytest.fixture
async def client(app):
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

@pytest.fixture
async def auth_client(client, test_user):
    token = create_access_token(test_user.id, test_user.roles)
    client.headers["Authorization"] = f"Bearer {token}"
    return client

# E2E test
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
    response = await client.post("/api/users", json={
        "email": "test@example.com",
        "name": "Test User",
        "password": "securepass123",
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "password" not in data  # Never expose

# Unit test (service layer)
@pytest.mark.asyncio
async def test_user_service_duplicate_email(user_service, mock_repo):
    mock_repo.get_by_email.return_value = existing_user
    with pytest.raises(ConflictError, match="Email already registered"):
        await user_service.create(UserCreate(email="taken@example.com", ...))

# Parametrized validation
@pytest.mark.parametrize("email,expected", [
    ("valid@example.com", True),
    ("invalid", False),
    ("", False),
    ("a@b.c", True),
])
def test_email_validation(email, expected):
    if expected:
        UserCreate(email=email, name="Test", password="12345678")
    else:
        with pytest.raises(ValidationError):
            UserCreate(email=email, name="Test", password="12345678")

7 Testing Rules

  1. Test services, not routers — business logic lives in services
  2. Use fixtures for DI override — swap real DB with test DB via app.dependency_overrides
  3. One assertion per test — clear what broke when it fails
  4. Test error paths — 40% of tests should be sad-path
  5. Use factories for test dataUserFactory.create() not manual dict construction
  6. Async tests need @pytest.mark.asyncio — or set asyncio_mode = "auto" in config
  7. Run tests in CI — block merge if tests fail

Phase 8: Structured Logging & Observability

Structlog Setup

import structlog
from uuid import uuid4
from starlette.middleware.base import BaseHTTPMiddleware

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

# Request ID middleware
class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid4()))
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
        )
        
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        
        logger.info(
            "request_completed",
            status_code=response.status_code,
        )
        return response

Health Check Endpoints

@router.get("/health")
async def health():
    """Liveness probe — is the process running?"""
    return {"status": "ok"}

@router.get("/ready")
async def ready(db: AsyncSession = Depends(get_db)):
    """Readiness probe — can we serve traffic?"""
    checks = {}
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception:
        checks["database"] = "error"
    
    all_ok = all(v == "ok" for v in checks.values())
    return JSONResponse(
        status_code=200 if all_ok else 503,
        content={"status": "ok" if all_ok else "degraded", "checks": checks},
    )

Phase 9: Performance Optimization

Priority Stack

| # | Technique | Impact | Effort | |---|-----------|--------|--------| | 1 | Async database queries | High | Low | | 2 | Connection pooling (tuned) | High | Low | | 3 | Response caching (Redis) | High | Medium | | 4 | Background tasks for heavy work | High | Low | | 5 | Pagination on all list endpoints | Medium | Low | | 6 | Select only needed columns | Medium | Low | | 7 | Eager loading (joinedload) | Medium | Medium | | 8 | Rate limiting | Medium | Low |

Background Tasks

from fastapi import BackgroundTasks

@router.post("/users", status_code=201)
async def create_user(
    user_in: UserCreate,
    background_tasks: BackgroundTasks,
    service: UserService = Depends(get_user_service),
):
    user = await service.create(user_in)
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    return user

Caching Pattern

from redis.asyncio import Redis
import json

class CacheService:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def get_or_set(self, key: str, factory, ttl: int = 300):
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        result = await factory()
        await self.redis.setex(key, ttl, json.dumps(result, default=str))
        return result
    
    async def invalidate(self, pattern: str):
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

Phase 10: Production Deployment

Multi-Stage Dockerfile

# Build stage
FROM python:3.12-slim AS builder
WORKDIR /app

RUN pip install --no-cache-dir uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-editable

# Production stage
FROM python:3.12-slim
WORKDIR /app

RUN adduser --disabled-password --no-create-home appuser

COPY --from=builder /app/.venv /app/.venv
COPY src/ ./src/
COPY migrations/ ./migrations/
COPY alembic.ini ./

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

USER appuser
EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
    CMD ["python", "-c", "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"]

CMD ["uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

App Factory Pattern

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("starting_up", environment=settings.environment)
    await init_db()
    yield
    # Shutdown
    logger.info("shutting_down")
    await engine.dispose()

def create_app() -> FastAPI:
    settings = get_settings()
    
    app = FastAPI(
        title=settings.app_name,
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
        redoc_url=None,
    )
    
    # Middleware (order matters — last added = first executed)
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    app.add_middleware(RequestIDMiddleware)
    
    # Error handlers
    app.add_exception_handler(AppError, app_error_handler)
    
    # Routers
    app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
    app.include_router(users_router, prefix="/api/users", tags=["users"])
    app.include_router(health_router, tags=["health"])
    
    return app

app = create_app()

GitHub Actions CI

name: CI
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_PASSWORD: test
          POSTGRES_DB: testdb
        ports: ["5432:5432"]
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      - run: pip install uv && uv sync
      - run: uv run ruff check .
      - run: uv run mypy src/
      - run: uv run pytest --cov=src --cov-report=xml -x
        env:
          DATABASE_URL: postgresql+asyncpg://postgres:test@localhost:5432/testdb
          JWT_SECRET: test-secret-key-at-least-32-chars

Production Checklist

P0 — Mandatory:

  • [ ] All secrets from environment variables (SecretStr)
  • [ ] HTTPS enforced
  • [ ] CORS configured per environment
  • [ ] Rate limiting on auth endpoints
  • [ ] Input validation on all endpoints
  • [ ] Structured error responses (no stack traces)
  • [ ] Health + readiness endpoints
  • [ ] Database connection pooling
  • [ ] Migrations run before deploy
  • [ ] Structured logging (JSON)
  • [ ] Tests passing in CI

P1 — Recommended:

  • [ ] OpenTelemetry tracing
  • [ ] Prometheus metrics endpoint
  • [ ] Background task queue (Celery/ARQ)
  • [ ] Redis caching layer
  • [ ] API versioning strategy
  • [ ] Request/response logging
  • [ ] Dependency security scanning
  • [ ] Performance benchmarks established

Phase 11: Advanced Patterns

Middleware Stack Order

# Applied bottom-to-top (last added = first executed)
app.add_middleware(GZipMiddleware, minimum_size=1000)    # 5. Compress
app.add_middleware(CORSMiddleware, ...)                  # 4. CORS
app.add_middleware(RequestIDMiddleware)                   # 3. Request ID
app.add_middleware(RateLimitMiddleware)                   # 2. Rate limit
app.add_middleware(TrustedHostMiddleware, allowed=["*"])  # 1. Host check

Pagination with Cursor-Based Option

from fastapi import Query

class PaginationParams:
    def __init__(
        self,
        page: int = Query(1, ge=1, description="Page number"),
        page_size: int = Query(20, ge=1, le=100, description="Items per page"),
    ):
        self.offset = (page - 1) * page_size
        self.limit = page_size
        self.page = page
        self.page_size = page_size

@router.get("/users", response_model=PaginatedResponse[UserResponse])
async def list_users(
    pagination: PaginationParams = Depends(),
    service: UserService = Depends(get_user_service),
):
    items, total = await service.list(
        offset=pagination.offset, limit=pagination.limit
    )
    return PaginatedResponse(
        items=items, total=total,
        page=pagination.page, page_size=pagination.page_size,
        has_next=(pagination.offset + pagination.limit) < total,
    )

WebSocket Pattern

from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
    
    async def connect(self, user_id: str, ws: WebSocket):
        await ws.accept()
        self.connections[user_id] = ws
    
    def disconnect(self, user_id: str):
        self.connections.pop(user_id, None)
    
    async def send(self, user_id: str, message: dict):
        if ws := self.connections.get(user_id):
            await ws.send_json(message)

manager = ConnectionManager()

@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            # Process message
    except WebSocketDisconnect:
        manager.disconnect(user_id)

File Upload Pattern

from fastapi import UploadFile, File

@router.post("/upload")
async def upload_file(
    file: UploadFile = File(..., description="File to upload"),
    user: User = Depends(get_current_user),
):
    # Validate
    if file.size and file.size > 10 * 1024 * 1024:  # 10MB
        raise ValidationError("File too large (max 10MB)")
    
    allowed_types = {"image/jpeg", "image/png", "application/pdf"}
    if file.content_type not in allowed_types:
        raise ValidationError(f"File type not allowed: {file.content_type}")
    
    # Save
    contents = await file.read()
    path = f"uploads/{user.id}/{file.filename}"
    # Save to S3/local storage...
    
    return {"filename": file.filename, "size": len(contents)}

Phase 12: Common Mistakes

| # | Mistake | Fix | |---|---------|-----| | 1 | Sync database calls in async app | Use async SQLAlchemy/databases | | 2 | Business logic in route handlers | Move to service layer | | 3 | No input validation | Pydantic models on every endpoint | | 4 | Returning ORM models directly | Use response schemas (from_attributes) | | 5 | Hardcoded config values | Pydantic Settings + env vars | | 6 | No error handling strategy | Custom exception hierarchy + global handler | | 7 | Missing health checks | /health + /ready endpoints | | 8 | print() for logging | structlog with JSON output | | 9 | No pagination on list endpoints | Default limit, max cap (100) | | 10 | Testing against production DB | Test fixtures with separate DB |

Quality Scoring (0–100)

| Dimension | Weight | 0–25 | 50 | 75 | 100 | |-----------|--------|------|----|----|-----| | Type Safety | 15% | No types | Partial Pydantic | Full schemas | Strict mypy pass | | Error Handling | 15% | Bare HTTPException | Custom errors | Full hierarchy | + monitoring | | Testing | 15% | None | Happy path | 80%+ coverage | + contract tests | | Security | 15% | No auth | Basic JWT | + RBAC + rate limit | + scanning + audit | | Performance | 10% | Sync everything | Async DB | + caching | + profiling | | Observability | 10% | print() | Structured logs | + tracing | + metrics + alerts | | Database | 10% | Raw SQL | ORM + migrations | + repository pattern | + connection tuning | | Deployment | 10% | Manual | Dockerfile | + CI/CD | + health + rollback |

Scoring: Your Score = Σ (dimension score × weight). < 40 = critical, 40–60 = needs work, 60–80 = solid, 80+ = production-grade.

10 Commandments of FastAPI Production

  1. Pydantic models at every boundary — request, response, config
  2. Async all the way down — one sync call blocks the event loop
  3. Services own business logic — routers are thin wrappers
  4. Dependency injection for testabilityDepends() is your best friend
  5. Structured errors, structured logs — JSON everything
  6. Health checks are non-negotiable — liveness + readiness
  7. Test the sad paths — 40% of tests should be error cases
  8. Migrations before deployment — never modify schema manually
  9. Secrets in environment, never in codeSecretStr enforces this
  10. Profile before optimizing — measure, don't guess

Natural Language Commands

  • audit my FastAPI project → Run health check, identify gaps
  • set up a new FastAPI project → Generate project structure + config
  • add authentication to my API → JWT + RBAC dependency pattern
  • create a CRUD feature for [resource] → Full router/service/repo/schemas
  • optimize my database queries → Connection pooling + async + N+1 prevention
  • add structured logging → Structlog + request ID middleware
  • write tests for [feature] → Async test patterns + fixtures
  • prepare for production deployment → Dockerfile + CI + checklist
  • add caching to my API → Redis caching pattern
  • set up error handling → Custom exception hierarchy + global handler
  • add WebSocket support → Connection manager pattern
  • review my API security → 10-point security checklist audit

Level up your FastAPI APIs → Get the AfrexAI SaaS Context Pack ($47) for complete SaaS architecture, pricing strategies, and go-to-market playbooks.

🔗 More free skills by AfrexAI:

🛒 Browse all packs → AfrexAI Storefront