CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB

ArjanCodes////4 min read

The Core Concept of CQRS

Command Query Responsibility Segregation (CQRS) fundamentally changes how we interact with data by splitting the application into two distinct paths. Commands handle operations that change state, like creating or updating a ticket. Queries handle the retrieval of data. In a standard CRUD application, the same data model often serves both purposes, leading to performance bottlenecks when read requirements—like complex dashboard aggregates or derived fields—start to interfere with write performance. solves this by allowing you to optimize the read and write models independently.

CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB
CQRS in Python: Clean Reads, Clean Writes

Prerequisites and Key Tools

To implement this pattern, you should be comfortable with and basic asynchronous programming. Familiarity with for building APIs and for data validation is essential. For storage, we use , specifically the driver, as its document model excels at handling the varying shapes of read projections.

Code Walkthrough: Splitting the Model

We start by separating our single collection into two: ticket_commands for the source of truth and ticket_reads for our optimized views. Instead of one generic update endpoint, we create specific commands that represent business intent.

from pydantic import BaseModel, field_validator

class UpdateStatus(BaseModel):
    status: str

    @field_validator("status")
    def status_must_not_be_closed(cls, v):
        if v == "closed":
            raise ValueError("Cannot manually close via this command")
        return v

def update_status_command(db, ticket_id, command: UpdateStatus):
    # Business logic lives here, isolated from the API
    result = db.ticket_commands.update_one(
        {"_id": ticket_id},
        {"$set": {"status": command.status}}
    )
    if result.matched_count == 0:
        raise ValueError("Ticket not found")

By moving logic into these command functions, the endpoints become thin wrappers. This isolation ensures that your business rules stay consistent regardless of how the data is displayed.

Implementing the Projector

The magic of happens in the projection phase. Every time a command modifies the write database, we trigger a projector function to update the read model. This read model includes pre-computed fields like message_preview so the list endpoint doesn't have to calculate them on the fly.

def project_ticket(db, ticket_id):
    # Fetch from write model
    ticket = db.ticket_commands.find_one({"_id": ticket_id})
    
    # Prepare optimized read model
    projection = {
        "_id": ticket["_id"],
        "subject": ticket["subject"],
        "status": ticket["status"],
        "preview": ticket["message"][:50],  # Pre-computed preview
        "has_note": "note" in ticket
    }
    
    # Update the read collection
    db.ticket_reads.replace_one({"_id": ticket_id}, projection, upsert=True)

Syntax Notes and Conventions

In this implementation, we use for more than just validation; we use it to define the contract of our commands. Note the use of replace_one with upsert=True in the projector. This ensures that the read model stays in sync whether the ticket is new or being updated. We also rely on dependency injection to pass the database session into our commands and queries.

Practical Examples

This architecture is a powerhouse for applications like analytics dashboards. Imagine a support system with millions of tickets. Instead of running expensive count or group aggregations on your main production table, you query a read-optimized collection that only contains the necessary status flags. This keeps the write database responsive for agents while providing instant insights for managers.

Tips and Gotchas

The biggest trade-off is eventual consistency. Because there is a tiny delay between the command execution and the projection, a user might not see their change immediately if they refresh the page instantly. You must also handle projection failures. If the projector crashes, your read model will be stale. In production systems, consider using a background task queue like or Change Streams to handle projections asynchronously and reliably.

End of Article
Source video
CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB

CQRS in Python: Clean Reads, Clean Writes

Watch

ArjanCodes // 22:25

4 min read0%
4 min read