SqlAlchemy 1.4 async ORM with FastAPI
在我的博客前面,我使用数据库库建立了一个使用 async sqlalchemy core 的后端服务。原因很简单,SQLAlchemy 还不支持 ORM 中的异步。现在,由于 SQLAlchemy 1.4已经在这里了,我们可以只使用这个包进行适当的设置!
本教程将介绍如何在 FastAPI、 PostgreSQL、 SQLAlchemy 1.4和 alembic 上设置可用于生产的应用程序。所有东西都使用 asyncio。
要求
- Python = “ ^ 3.9”
- Fastapi = “ ^ 0.70.0”
- SQLAlchemy = “ ^ 1.4.25”
- Uvicorn = “ ^ 0.15.0”
- Asyncpg = “ ^ 0.24.0”
- Alembic = “ ^ 1.7.4”
- Psycopg2 = “ ^ 2.9.1”
项目设置
首先,我想分享一个结构,这个项目看起来是这样的:
├── Dockerfile
├── README.md
├── alembic.ini
├── app
│ ├── __init__.py
│ ├── api
│ ├── core
│ ├── db
│ ├── main.py
│ └── models
├── docker-compose.yml
├── poetry.lock
├── pyproject.toml
└── tests
├── __init__.py
├── app
└── conftest.py
从顶部开始,我们将在项目的根部存储配置文件,如 docker、迁移、 poetry 等。
接下来,我们的 Python 应用程序模块放在 app 目录中。
最后,测试模块位于同一目录级别。将它保留在应用程序模块之外的原因与诗歌分离依赖的原因类似。对于由代码创建的各种构建,最终映像或服务器上不需要测试。
在后台构建和运行系统
教程代码可以在我的 github 上找到
git clone git@github.com:rglsk/fastapi-sqlalchemy-1.4-async.git
数据库设置
根据结构,db 目录显示如下:
│ ├── db
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── base_class.py
│ │ ├── errors.py
│ │ ├── migrations
│ │ │ ├── README
│ │ │ ├── __pycache__
│ │ │ ├── env.py
│ │ │ ├── script.py.mako
│ │ │ └── versions
│ │ ├── repositories
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ └── coupons.py
│ │ ├── session.py
│ │ └── tables
│ │ ├── __init__.py
│ │ └── coupons.py
本地数据库
version: "3.7"
services:
postgres:
image: postgres:12.5
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- postgresql_data:/var/lib/postgresql/data/
expose:
- 5432
ports:
- 5432:5432
volumes:
postgresql_data:
陈述性基础
要定义映射类并开始创建表,必须具有 Base。在给定的示例中,使用了 _ declarative decorator,它只是将给定的类适配为 declarative _ base ()。此外,每个表都必须包含主键,即 UUID (这是我的个人偏好)。
# base_class.py
import uuid
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: uuid.UUID = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
注意
声明性 _ base ()函数现在是更通用的注册表类的特殊化。该函数还从 declarative.ext 包移动到 sqlalchemy.orm 包。
表格
项目中创建的每个表都需要继承声明基。
# tables/coupons.py
from sqlalchemy import Column, Integer, String
from app.db.base_class import Base
class Coupon(Base):
__tablename__ = "coupon"
code = Column(String, nullable=False, unique=True)
init_count = Column(Integer, nullable=False)
remaining_count = Column(Integer, nullable=False)
数据库会话
想要与数据库建立连接,我们需要定义会话。Project 需要异步工作,因此将使用 create_async _ engine。它的工作原理与传统的引擎 API 相同,但是它还使它成为异步的。
# session.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_async_engine(
settings.async_database_url,
echo=settings.DB_ECHO_LOG,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
迁移
长话短说,在生产过程中,我们不可能每次发出请求或构建时都创建和删除表。为了解决这个问题,我们将使用 alembic。这是我自己选择的迁移工具,特别是在处理 SQLAlchemy 时。
Init 迁移
alemic init app/db/migrations
Alembic ini 配置
一切都可以像自动生成一样,最重要的是保持位置脚本路径的正确性。
# alembic.ini
[alembic]
script_location = app/db/migrations
配置 Alembic env
这个文件最重要的部分是设置正确的数据库 URL。
请注意,Base 类是从 Base.py 而不是 Base _ class.py 导入的。这是因为 alembic 需要收集所有映射表。
from app.core.config import settings # noqa
from app.db.base import Base # noqa
config = context.config
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
target_metadata = Base.metadata
fileConfig(config.config_file_name)
通过 alembic 导入所有表模型
# app/db/base.py
from app.db.base_class import Base # noqa: F401
from app.db.tables.coupons import Coupon # noqa: F401
生成迁移
docker-compose run app alembic revision --autogenerate
运行迁移
docker-compose run app alembic upgrade head
仓库
作为一个领域驱动设计,我喜欢在 Python 代码库中使用存储库模式。它是服务和数据库之间的代码层,有助于处理数据库操作。不幸的是,编写代码和添加所有单元测试需要额外的工作时间。我看得出这是值得的!它使代码保持在非常好的结构中,按层分组,并且它不允许用特定的 ORM 库甚至数据库绑定项目。
在本教程中,baserepostar 仅包含两个基本方法,它们都是等待提交的异步方法。
# app/db/repositories/base.py
import abc
from typing import Generic, TypeVar, Type
from uuid import uuid4, UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.errors import DoesNotExist
from app.models.schema.base import BaseSchema
IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseSchema)
SCHEMA = TypeVar("SCHEMA", bound=BaseSchema)
TABLE = TypeVar("TABLE")
class BaseRepository(Generic[IN_SCHEMA, SCHEMA, TABLE], metaclass=abc.ABCMeta):
def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
self._db_session: AsyncSession = db_session
@property
@abc.abstractmethod
def _table(self) -> Type[TABLE]:
...
@property
@abc.abstractmethod
def _schema(self) -> Type[SCHEMA]:
...
async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
entry = self._table(id=uuid4(), **in_schema.dict())
self._db_session.add(entry)
await self._db_session.commit()
return self._schema.from_orm(entry)
async def get_by_id(self, entry_id: UUID) -> SCHEMA:
entry = await self._db_session.get(self._table, entry_id)
if not entry:
raise DoesNotExist(
f"{self._table.__name__}<id:{entry_id}> does not exist"
)
return self._schema.from_orm(entry)
Baserepossery 的用法:
# app/repositories/coupons.py
from typing import Type
from app.db.repositories.base import BaseRepository
from app.db.tables.coupons import Coupon
from app.models.schema.coupons import InCouponSchema, CouponSchema
class CouponsRepository(BaseRepository[InCouponSchema, CouponSchema, Coupon]):
@property
def _in_schema(self) -> Type[InCouponSchema]:
return InCouponSchema
@property
def _schema(self) -> Type[CouponSchema]:
return CouponSchema
@property
def _table(self) -> Type[Coupon]:
return Coupon
API 处理程序
│ ├── api
│ │ ├── __init__.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ └── db.py
│ │ └── routes
│ │ ├── __init__.py
│ │ ├── api.py
│ │ └── coupons.py
使用 FastAPI 我们需要讨论依赖注入!长话短说,这意味着我们在代码中使用一种机制来声明工作和使用所需的东西(称为依赖项) ,然后 FastAPI 将施展魔法,并在需要时注入它们。
我们在这个项目中唯一的依赖,但是一个重要的依赖是数据库。它创建一个异步会话,生成异步会话,并在会话结束时提交。
# api/dependencies/db.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session
async def get_db() -> AsyncSession:
"""
Dependency function that yields db sessions
"""
async with async_session() as session:
yield session
await session.commit()
这种使用是由于名为 Depends 的 FastAPI 内建类发生的,该类将我们的依赖注入到处理程序中。
# api/routes/coupons.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.api.dependencies.db import get_db
from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import OutCouponSchema, InCouponSchema
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=OutCouponSchema)
async def create_coupon(
payload: InCouponSchema, db: AsyncSession = Depends(get_db)
) -> OutCouponSchema:
coupons_repository = CouponsRepository(db)
coupon = await coupons_repository.create(payload)
return OutCouponSchema(**coupon.dict())
测试
现在我们终于好好开始真正的乐趣了!
在代码库中使用异步方法来正确地运行测试,我们需要在 scope = session 中设置事件循环。
# tests/conftest.py
@pytest.fixture(scope="session")
def event_loop(request) -> Generator:
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
接下来,我们需要创建一个带有新的 db _ session 的 fixture,它将只在我们的测试中运行,创建和删除表,并允许执行数据库操作。
@pytest.fixture()
async def db_session() -> AsyncSession:
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.drop_all)
await connection.run_sync(Base.metadata.create_all)
async with async_session(bind=connection) as session:
yield session
await session.flush()
await session.rollback()
只有上面的会话,我们就可以开始在测试中创建条目了。不幸的是,当我们尝试在我们的处理程序中连接 DB 时,我们会遇到这个问题。
为了解决它,我们需要覆盖
@pytest.fixture()
def override_get_db(db_session: AsyncSession) -> Callable:
async def _override_get_db():
yield db_session
return _override_get_db
@pytest.fixture()
def app(override_get_db: Callable) -> FastAPI:
from app.api.dependencies.db import get_db
from app.main import app
app.dependency_overrides[get_db] = override_get_db
return app
最后,我们需要创建一个 httpx lib 可以帮助我们的异步测试客户端。
from typing import AsyncGenerator
import pytest
from fastapi import FastAPI
from httpx import AsyncClient
@pytest.fixture()
async def async_client(app: FastAPI) -> AsyncGenerator:
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
配置了我们的装置之后,没有什么比创建我们的第一个测试更令人高兴的了。测试的重要部分是记住定义 pytest.mark.asyncio。它可以按原样实现,例如使用 pytestmark 或者将它作为修饰符附加到每个测试方法。
from unittest import mock
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import InCouponSchema
pytestmark = pytest.mark.asyncio
async def test_coupon_create(
async_client: AsyncClient, db_session: AsyncSession
) -> None:
coupons_repository = CouponsRepository(db_session)
payload = {
"code": "PIOTR",
"init_count": 100,
}
response = await async_client.post("/v1/coupons/", json=payload)
coupon = await coupons_repository.get_by_id(response.json()["id"])
assert response.status_code == status.HTTP_201_CREATED
assert response.json() == {
"code": payload["code"],
"init_count": payload["init_count"],
"remaining_count": payload["init_count"],
"id": str(coupon.id),
}
摘要
这个设置就像一个符咒。它在大型装载服务上进行了实战测试。当然,在请求中间会出现一些问题,比如连接池和关闭连接(但这是下一篇博客文章的主题!).
我的整体感觉相当不错,需要一些时间来安排一切并且很好地测试它,但是从我的角度来看,这是值得的。让 DB 连接异步会给你带来很多好处吗?这取决于用例..。
FastAPI 提示和技巧
- 使用 Pydantic 模型配置: orm _ mode = True
- 保持您的 Pydantic 模式清晰可读
- 尝试创建自己的存储库甚至服务,我没有太多时间来完全构建它们(我根本没有构建服务层)
- 请使用黑色代码格式化程序
原创文章,作者:flypython,如若转载,请注明出处:http://flypython.com/advanced-python/563.html