CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB

The Core Concept of CQRS

Command Query Responsibility Segregation (CQRS) fundamentally changes how we interact with data by splitting the application into two distinct paths. Commands handle operations that change state, like creating or updating a ticket. Queries handle the retrieval of data. In a standard CRUD application, the same data model often serves both purposes, leading to performance bottlenecks when read requirements—like complex dashboard aggregates or derived fields—start to interfere with write performance.

solves this by allowing you to optimize the read and write models independently.

CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB
CQRS in Python: Clean Reads, Clean Writes

Prerequisites and Key Tools

To implement this pattern, you should be comfortable with

and basic asynchronous programming. Familiarity with
FastAPI
for building APIs and
Pydantic
for data validation is essential. For storage, we use
MongoDB
, specifically the
PyMongo
driver, as its document model excels at handling the varying shapes of read projections.

Code Walkthrough: Splitting the Model

We start by separating our single collection into two: ticket_commands for the source of truth and ticket_reads for our optimized views. Instead of one generic update endpoint, we create specific commands that represent business intent.

from pydantic import BaseModel, field_validator

class UpdateStatus(BaseModel):
    status: str

    @field_validator("status")
    def status_must_not_be_closed(cls, v):
        if v == "closed":
            raise ValueError("Cannot manually close via this command")
        return v

def update_status_command(db, ticket_id, command: UpdateStatus):
    # Business logic lives here, isolated from the API
    result = db.ticket_commands.update_one(
        {"_id": ticket_id},
        {"$set": {"status": command.status}}
    )
    if result.matched_count == 0:
        raise ValueError("Ticket not found")

By moving logic into these command functions, the

endpoints become thin wrappers. This isolation ensures that your business rules stay consistent regardless of how the data is displayed.

Implementing the Projector

The magic of

happens in the projection phase. Every time a command modifies the write database, we trigger a projector function to update the read model. This read model includes pre-computed fields like message_preview so the list endpoint doesn't have to calculate them on the fly.

def project_ticket(db, ticket_id):
    # Fetch from write model
    ticket = db.ticket_commands.find_one({"_id": ticket_id})
    
    # Prepare optimized read model
    projection = {
        "_id": ticket["_id"],
        "subject": ticket["subject"],
        "status": ticket["status"],
        "preview": ticket["message"][:50],  # Pre-computed preview
        "has_note": "note" in ticket
    }
    
    # Update the read collection
    db.ticket_reads.replace_one({"_id": ticket_id}, projection, upsert=True)

Syntax Notes and Conventions

In this implementation, we use

for more than just validation; we use it to define the contract of our commands. Note the use of replace_one with upsert=True in the projector. This ensures that the read model stays in sync whether the ticket is new or being updated. We also rely on
FastAPI
dependency injection to pass the database session into our commands and queries.

Practical Examples

This architecture is a powerhouse for applications like analytics dashboards. Imagine a support system with millions of tickets. Instead of running expensive count or group aggregations on your main production table, you query a read-optimized collection that only contains the necessary status flags. This keeps the write database responsive for agents while providing instant insights for managers.

Tips and Gotchas

The biggest trade-off is eventual consistency. Because there is a tiny delay between the command execution and the projection, a user might not see their change immediately if they refresh the page instantly. You must also handle projection failures. If the projector crashes, your read model will be stale. In production systems, consider using a background task queue like

or
MongoDB
Change Streams to handle projections asynchronously and reliably.

CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB

Fancy watching it?

Watch the full video and context

4 min read