FastAPI SQLAlchemy 实战:构建高性能Web应用 – wiki词典


FastAPI SQLAlchemy 实战:构建高性能Web应用

在现代Web开发中,性能和可伸缩性是至关重要的考量。FastAPI 凭借其出色的性能(基于 Starlette 和 Pydantic)、异步支持以及自动生成API文档的能力,成为了构建高性能API的理想选择。而 SQLAlchemy 作为 Python 中最强大和灵活的ORM(对象关系映射)工具之一,能够帮助开发者高效、安全地与各种关系型数据库进行交互。

本文将通过一个实战示例,详细介绍如何将 FastAPI 与异步 SQLAlchemy 2.0 结合起来,构建一个具有 CRUD 功能的Web应用程序,并探讨如何确保其高性能。

为什么选择 FastAPI 和 SQLAlchemy?

  • FastAPI:

    • 高性能: 基于 Starlette 和 Pydantic,提供接近 Go 的性能。
    • 异步支持: 原生支持 async/await,允许高效处理 I/O 密集型任务,如数据库查询和外部API调用,从而提高并发能力。
    • 数据验证与序列化: 使用 Pydantic 进行请求和响应数据的自动验证、序列化和反序列化,减少了样板代码,并增强了API的健壮性。
    • 自动文档: 自动生成 OpenAPI (Swagger UI) 和 ReDoc 文档,极大地简化了API的测试和协作。
    • 依赖注入: 强大的依赖注入系统使得管理数据库会话、身份验证等变得简单和可测试。
  • SQLAlchemy:

    • 功能强大: 提供全面的ORM功能,支持复杂的查询、关系映射、事务管理等。
    • 灵活性: 既可以作为ORM使用,也可以进行核心的SQL表达式构建,满足不同场景的需求。
    • 异步支持(2.0版): SQLAlchemy 2.0 引入了原生的异步支持,与 FastAPI 的异步特性完美契合。
    • 数据库兼容性: 支持广泛的关系型数据库,如 PostgreSQL, MySQL, SQLite 等。

项目结构

我们将构建一个简单的用户和物品管理系统。以下是推荐的项目文件结构:

fastapi_sqlalchemy_app/
├── requirements.txt
├── database.py # 数据库配置和会话管理
├── models.py # SQLAlchemy 模型定义
├── schemas.py # Pydantic 数据模型定义 (用于请求/响应)
└── main.py # FastAPI 应用程序入口和API路由

1. requirements.txt

这个文件列出了项目所需的所有 Python 依赖。请确保安装了支持异步操作的库。

“`
fastapi
uvicorn[standard]
sqlalchemy[asyncio]

如果使用 PostgreSQL,则需要 asyncpg。对于 SQLite,不是必需的。

asyncpg

aiosqlite # 异步 SQLite 驱动
pydantic
“`

2. database.py

此文件负责配置 SQLAlchemy 引擎、会话以及声明式基类。我们将使用异步 SQLAlchemy 2.0 风格。为了简化,我们以 SQLite 为例,但切换到 PostgreSQL 或 MySQL 等其他数据库非常容易。

“`python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

SQLite 数据库 URL

对于生产环境,强烈建议使用 PostgreSQL 或其他成熟的关系型数据库

SQLALCHEMY_DATABASE_URL = “sqlite+aiosqlite:///./sql_app.db”

创建异步引擎

echo=True 会打印所有 SQL 语句,有助于调试

engine = create_async_engine(
SQLALCHEMY_DATABASE_URL, connect_args={“check_same_thread”: False}, echo=True
)

创建异步会话工厂

AsyncSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession,
expire_on_commit=False, # 避免在提交后使对象过期
)

Base = declarative_base()

依赖项注入函数,用于获取数据库会话

async def get_db():
async with AsyncSessionLocal() as session:
yield session

“`

  • create_async_engine: 用于创建异步数据库引擎。
  • AsyncSessionLocal: 异步会话工厂,每次请求都会从这里获取一个独立的数据库会话。
  • declarative_base(): 用于定义 SQLAlchemy ORM 模型。
  • get_db(): 这是一个 FastAPI 依赖函数,通过 yield 实现了上下文管理器,确保在请求结束后正确关闭数据库会话,这是最佳实践。

3. models.py

此文件定义了 SQLAlchemy 模型,它们映射到数据库表。我们将创建 UserItem 两个模型。

“`python
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship

from .database import Base

class User(Base):
tablename = “users”

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)

items = relationship("Item", back_populates="owner") # 定义与 Item 的一对多关系

class Item(Base):
tablename = “items”

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id")) # 外键关联 User

owner = relationship("User", back_populates="items") # 定义与 User 的多对一关系

“`

  • Base: 从 database.py 导入的声明式基类。
  • Column: 定义表字段及其类型。
  • primary_key=True, index=True, unique=True: 数据库约束和优化。
  • relationship(): 定义模型之间的关系,这里是 UserItem 之间的一对多关系。back_populates 确保了关系的双向性。

4. schemas.py

此文件定义了 Pydantic 模型,用于请求和响应数据验证以及序列化。这些模型定义了API接口的数据结构,而不是数据库表结构。

“`python
from pydantic import BaseModel
from typing import List, Optional

class ItemBase(BaseModel):
title: str
description: Optional[str] = None

class ItemCreate(ItemBase):
pass # 创建时不需要额外的字段

class Item(ItemBase):
id: int
owner_id: int

class Config:
    orm_mode = True # 允许 Pydantic 从 ORM 对象读取数据 (兼容 SQLAlchemy 对象)

class UserBase(BaseModel):
email: str

class UserCreate(UserBase):
password: str # 创建用户时需要密码

class User(UserBase):
id: int
is_active: bool
items: List[Item] = [] # 用户可以包含多个物品

class Config:
    orm_mode = True # 允许 Pydantic 从 ORM 对象读取数据

“`

  • BaseModel: Pydantic 的基类。
  • Optional[str]: 表示该字段是可选的,且默认为 None
  • class Config: orm_mode = True: 这是关键,它告诉 Pydantic 可以从 SQLAlchemy ORM 对象读取数据,即使这些数据不是字典形式。这使得将数据库对象直接转换为API响应变得非常方便。

5. main.py

这是 FastAPI 应用程序的主文件,包含 API 路由和 CRUD 操作的实现。

“`python
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select # 推荐使用 sqlalchemy.future.select 进行异步查询
from typing import List

from . import models, schemas
from .database import engine, get_db

创建所有数据库表

在生产环境中,您通常会使用 Alembic 等迁移工具来管理数据库模式的更改

async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(models.Base.metadata.create_all)

app = FastAPI()

在应用程序启动时创建数据库表

@app.on_event(“startup”)
async def on_startup():
await create_db_and_tables()

用户相关的 CRUD 操作

@app.post(“/users/”, response_model=schemas.User, status_code=status.HTTP_201_CREATED)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
# 检查邮箱是否已注册
result = await db.execute(select(models.User).filter(models.User.email == user.email))
db_user = result.scalar_one_or_none()
if db_user:
raise HTTPException(status_code=400, detail=”Email already registered”)

# 创建用户并模拟哈希密码
fake_hashed_password = user.password + "notreallyhashed" # 生产环境请使用安全的哈希算法
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)

db.add(db_user)
await db.commit() # 提交事务
await db.refresh(db_user) # 刷新对象以获取数据库生成的值 (如 id)
return db_user

@app.get(“/users/”, response_model=List[schemas.User])
async def read_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db)):
# 查询所有用户,支持分页
result = await db.execute(select(models.User).offset(skip).limit(limit))
users = result.scalars().all() # 获取所有结果
return users

@app.get(“/users/{user_id}”, response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
# 根据 ID 查询单个用户
result = await db.execute(select(models.User).filter(models.User.id == user_id))
user = result.scalar_one_or_none() # 获取单个结果或 None
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return user

物品相关的 CRUD 操作

@app.post(“/users/{user_id}/items/”, response_model=schemas.Item, status_code=status.HTTP_201_CREATED)
async def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: AsyncSession = Depends(get_db)
):
# 检查用户是否存在
result = await db.execute(select(models.User).filter(models.User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)

# 为指定用户创建物品
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
await db.commit()
await db.refresh(db_item)
return db_item

@app.get(“/items/”, response_model=List[schemas.Item])
async def read_items(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db)):
# 查询所有物品,支持分页
result = await db.execute(select(models.Item).offset(skip).limit(limit))
items = result.scalars().all()
return items

“`

  • app = FastAPI(): 创建 FastAPI 应用实例。
  • @app.on_event("startup"): 装饰器,在应用程序启动时执行 create_db_and_tables 函数,用于创建数据库表(仅用于开发环境,生产环境应使用迁移工具)。
  • Depends(get_db): FastAPI 的依赖注入系统,每次请求都会调用 get_db 获取一个数据库会话。
  • db.execute(select(...)): 执行异步查询。sqlalchemy.future.select 是 SQLAlchemy 2.0 中推荐的查询构建方式。
  • result.scalar_one_or_none(): 获取单个查询结果,如果不存在则返回 None
  • result.scalars().all(): 获取所有查询结果。
  • db.add(), db.commit(), db.refresh(): 经典的 SQLAlchemy ORM 操作,用于添加、提交和刷新数据库对象。注意这些现在都是 awaitable 的。
  • HTTPException: FastAPI 用于处理HTTP错误响应。
  • status_code=status.HTTP_201_CREATED: 为 POST 请求设置标准的 201 Created 状态码。

如何运行应用程序

  1. 创建项目文件夹和文件:
    在您的本地机器上,手动创建一个名为 fastapi_sqlalchemy_app 的文件夹,并按照上述“项目结构”部分创建相应的 requirements.txt, database.py, models.py, schemas.py, main.py 文件。

  2. 粘贴代码:
    将上面提供的代码内容分别粘贴到对应的文件中。

  3. 创建并激活虚拟环境 (推荐):
    打开终端或命令行,导航到 fastapi_sqlalchemy_app 文件夹,然后运行以下命令:

    “`bash
    python -m venv venv

    在 macOS/Linux 上:

    source venv/bin/activate

    在 Windows 上:

    venv\Scripts\activate
    “`

  4. 安装依赖:
    在激活的虚拟环境中,运行以下命令安装 requirements.txt 中列出的所有库:

    bash
    pip install -r requirements.txt

  5. 运行 FastAPI 应用程序:
    fastapi_sqlalchemy_app 文件夹中,运行以下命令:

    bash
    uvicorn main:app --reload

    • main: 指的是 main.py 文件。
    • app: 指的是 main.py 文件中创建的 FastAPI() 实例。
    • --reload: 允许在代码更改时自动重新加载服务器,方便开发。
  6. 访问 API 文档:
    应用程序启动后,您可以在浏览器中访问以下 URL 来查看自动生成的 API 文档 (Swagger UI):
    http://127.0.0.1:8000/docs

    您也可以访问 ReDoc 文档:
    http://127.0.0.1:8000/redoc

现在,您可以使用这些文档来测试您的 FastAPI 应用程序,创建用户和项目,并执行其他 CRUD 操作。

高性能注意事项

为了确保您的 FastAPI + SQLAlchemy 应用程序具有高性能,请考虑以下几点:

  1. 充分利用异步操作:
    FastAPI 和 SQLAlchemy 2.0 的异步支持是性能的关键。始终使用 await 调用数据库操作(如 db.execute, db.commit 等),这样您的应用程序在等待数据库响应时可以处理其他请求,从而提高并发吞吐量。

  2. 数据库连接池:
    SQLAlchemy 引擎默认会管理连接池。连接池重用已经建立的数据库连接,减少了每次请求建立新连接的开销。对于高并发场景,正确配置连接池大小和超时非常重要。

  3. Pydantic 的高效验证:
    Pydantic 提供了高效的数据验证和序列化。它在入口和出口处确保数据格式的正确性,减少了在业务逻辑层进行手动验证的需要,从而提高了API的健壮性和开发效率。

  4. FastAPI 的依赖注入:
    FastAPI 的依赖注入系统不仅使得代码模块化和可测试,还能高效管理资源。例如,get_db 依赖函数确保每个请求都有一个独立的数据库会话,并且在请求结束后会话会被正确关闭,避免了资源泄露。

  5. 选择合适的数据库:
    对于生产环境,强烈建议使用 PostgreSQL、MySQL 或其他成熟的关系型数据库,而不是 SQLite。这些数据库提供了更高级的并发控制、事务管理、索引优化以及更强大的性能分析工具。

  6. 数据库迁移工具:
    在生产应用程序中,不应在每次启动时使用 Base.metadata.create_all 来创建或更新数据库表。这会丢失现有数据。请使用像 Alembic 这样的数据库迁移工具来安全、有序地管理数据库模式的更改。

  7. 查询优化:

    • 避免 N+1 查询问题: 当查询一个主对象列表及其关联的子对象时,如果每次都单独查询子对象,会导致 N+1 个查询。使用 SQLAlchemy 的 selectinloadjoinedload 策略进行急加载(Eager Loading)可以避免此问题。
    • 添加索引: 对经常用于查询条件的字段(如 email, owner_id)添加数据库索引可以显著加快查询速度。
    • 合理使用 limitoffset 对于返回大量数据的API,务必实现分页,避免一次性加载所有数据导致内存溢出和响应延迟。
  8. 缓存策略:
    对于不经常变化但访问频繁的数据,可以考虑引入缓存机制(如 Redis),减少数据库的负载。

总结

FastAPI 和异步 SQLAlchemy 2.0 提供了一个强大的组合,可以帮助开发者构建高性能、可扩展且易于维护的Web应用程序。通过遵循本文中的最佳实践,并注意上述高性能考量,您可以充分发挥这两个框架的潜力,为用户提供卓越的体验。


滚动至顶部