CQRS in Python: Decoupling Reads and Writes with FastAPI and MongoDB
The Core Concept of CQRS
Command Query Responsibility Segregation (CQRS) fundamentally changes how we interact with data by splitting the application into two distinct paths. Commands handle operations that change state, like creating or updating a ticket. Queries handle the retrieval of data. In a standard CRUD application, the same data model often serves both purposes, leading to performance bottlenecks when read requirements—like complex dashboard aggregates or derived fields—start to interfere with write performance.

Prerequisites and Key Tools
To implement this pattern, you should be comfortable with
Code Walkthrough: Splitting the Model
We start by separating our single collection into two: ticket_commands for the source of truth and ticket_reads for our optimized views. Instead of one generic update endpoint, we create specific commands that represent business intent.
from pydantic import BaseModel, field_validator
class UpdateStatus(BaseModel):
status: str
@field_validator("status")
def status_must_not_be_closed(cls, v):
if v == "closed":
raise ValueError("Cannot manually close via this command")
return v
def update_status_command(db, ticket_id, command: UpdateStatus):
# Business logic lives here, isolated from the API
result = db.ticket_commands.update_one(
{"_id": ticket_id},
{"$set": {"status": command.status}}
)
if result.matched_count == 0:
raise ValueError("Ticket not found")
By moving logic into these command functions, the
Implementing the Projector
The magic of message_preview so the list endpoint doesn't have to calculate them on the fly.
def project_ticket(db, ticket_id):
# Fetch from write model
ticket = db.ticket_commands.find_one({"_id": ticket_id})
# Prepare optimized read model
projection = {
"_id": ticket["_id"],
"subject": ticket["subject"],
"status": ticket["status"],
"preview": ticket["message"][:50], # Pre-computed preview
"has_note": "note" in ticket
}
# Update the read collection
db.ticket_reads.replace_one({"_id": ticket_id}, projection, upsert=True)
Syntax Notes and Conventions
In this implementation, we use replace_one with upsert=True in the projector. This ensures that the read model stays in sync whether the ticket is new or being updated. We also rely on
Practical Examples
This architecture is a powerhouse for applications like analytics dashboards. Imagine a support system with millions of tickets. Instead of running expensive count or group aggregations on your main production table, you query a read-optimized collection that only contains the necessary status flags. This keeps the write database responsive for agents while providing instant insights for managers.
Tips and Gotchas
The biggest trade-off is eventual consistency. Because there is a tiny delay between the command execution and the projection, a user might not see their change immediately if they refresh the page instantly. You must also handle projection failures. If the projector crashes, your read model will be stale. In production systems, consider using a background task queue like

Fancy watching it?
Watch the full video and context