The Core Concept of CQRS Command Query Responsibility Segregation (CQRS) fundamentally changes how we interact with data by splitting the application into two distinct paths. Commands handle operations that change state, like creating or updating a ticket. Queries handle the retrieval of data. In a standard CRUD application, the same data model often serves both purposes, leading to performance bottlenecks when read requirements—like complex dashboard aggregates or derived fields—start to interfere with write performance. CQRS solves this by allowing you to optimize the read and write models independently. Prerequisites and Key Tools To implement this pattern, you should be comfortable with Python and basic asynchronous programming. Familiarity with FastAPI for building APIs and Pydantic for data validation is essential. For storage, we use MongoDB, specifically the PyMongo driver, as its document model excels at handling the varying shapes of read projections. Code Walkthrough: Splitting the Model We start by separating our single collection into two: `ticket_commands` for the source of truth and `ticket_reads` for our optimized views. Instead of one generic update endpoint, we create specific commands that represent business intent. ```python from pydantic import BaseModel, field_validator class UpdateStatus(BaseModel): status: str @field_validator("status") def status_must_not_be_closed(cls, v): if v == "closed": raise ValueError("Cannot manually close via this command") return v def update_status_command(db, ticket_id, command: UpdateStatus): # Business logic lives here, isolated from the API result = db.ticket_commands.update_one( {"_id": ticket_id}, {"$set": {"status": command.status}} ) if result.matched_count == 0: raise ValueError("Ticket not found") ``` By moving logic into these command functions, the FastAPI endpoints become thin wrappers. This isolation ensures that your business rules stay consistent regardless of how the data is displayed. Implementing the Projector The magic of CQRS happens in the projection phase. Every time a command modifies the write database, we trigger a projector function to update the read model. This read model includes pre-computed fields like `message_preview` so the list endpoint doesn't have to calculate them on the fly. ```python def project_ticket(db, ticket_id): # Fetch from write model ticket = db.ticket_commands.find_one({"_id": ticket_id}) # Prepare optimized read model projection = { "_id": ticket["_id"], "subject": ticket["subject"], "status": ticket["status"], "preview": ticket["message"][:50], # Pre-computed preview "has_note": "note" in ticket } # Update the read collection db.ticket_reads.replace_one({"_id": ticket_id}, projection, upsert=True) ``` Syntax Notes and Conventions In this implementation, we use Pydantic for more than just validation; we use it to define the contract of our commands. Note the use of `replace_one` with `upsert=True` in the projector. This ensures that the read model stays in sync whether the ticket is new or being updated. We also rely on FastAPI dependency injection to pass the database session into our commands and queries. Practical Examples This architecture is a powerhouse for applications like analytics dashboards. Imagine a support system with millions of tickets. Instead of running expensive `count` or `group` aggregations on your main production table, you query a read-optimized collection that only contains the necessary status flags. This keeps the write database responsive for agents while providing instant insights for managers. Tips and Gotchas The biggest trade-off is eventual consistency. Because there is a tiny delay between the command execution and the projection, a user might not see their change immediately if they refresh the page instantly. You must also handle projection failures. If the projector crashes, your read model will be stale. In production systems, consider using a background task queue like Celery or MongoDB Change Streams to handle projections asynchronously and reliably.
Event Sourcing
Concepts
- Feb 13, 2026
- Nov 21, 2025