🏗️ FASE 2: Arquitectura y Diseño de Sistemas
Recetas prácticas para diseñar sistemas escalables y robustos
📚 Tabla de Contenidos
- Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?
- Receta 2.2: Event-Driven Architecture
- Receta 2.3: CQRS y Event Sourcing
- Receta 2.4: CAP Theorem y Consistencia Eventual
- Receta 2.5: Load Balancing, Caching y Queues
Diseño de Software Escalable
Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?
¿Qué son?
- Monolito: Aplicación única donde todos los componentes están acoplados
- Microservicios: Aplicación dividida en servicios pequeños e independientes
Comparación:
| Característica | Monolito | Microservicios |
|---|---|---|
| Complejidad inicial | 🟢 Baja | 🔴 Alta |
| Deployment | Simple (1 deploy) | Complejo (N deploys) |
| Escalabilidad | Vertical solamente | Horizontal por servicio |
| Testing | 🟢 Fácil E2E | 🔴 Difícil E2E |
| Desarrollo inicial | 🟢 Rápido | 🔴 Lento |
| Gestión | Simple | Requiere DevOps/SRE |
| Debugging | 🟢 Fácil | 🔴 Trazas distribuidas |
| Tecnología | Stack único | Poliglota |
Caso de uso: eCommerce
# ===== MONOLITO =====
from flask import Flask, request, jsonify
from datetime import datetime
import sqlite3
app = Flask(__name__)
class MonolithEcommerce:
"""Todo en una aplicación"""
def __init__(self):
self.db = sqlite3.connect('ecommerce.db', check_same_thread=False)
self._init_db()
def _init_db(self):
"""Todas las tablas en una DB"""
cursor = self.db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
email TEXT,
name TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL,
stock INTEGER
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
user_id INTEGER,
total REAL,
status TEXT,
created_at TEXT
)
''')
self.db.commit()
def create_order(self, user_id, product_ids):
"""Todo el flujo de orden en un lugar"""
cursor = self.db.cursor()
# 1. Validar usuario
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
if not user:
return {'error': 'User not found'}, 404
# 2. Calcular total y validar stock
total = 0
for product_id in product_ids:
cursor.execute('SELECT price, stock FROM products WHERE id = ?', (product_id,))
product = cursor.fetchone()
if not product or product[1] < 1:
return {'error': f'Product {product_id} unavailable'}, 400
total += product[0]
# 3. Crear orden
cursor.execute('''
INSERT INTO orders (user_id, total, status, created_at)
VALUES (?, ?, ?, ?)
''', (user_id, total, 'pending', datetime.now().isoformat()))
# 4. Reducir stock
for product_id in product_ids:
cursor.execute('''
UPDATE products SET stock = stock - 1 WHERE id = ?
''', (product_id,))
# 5. Enviar email (simulado)
self._send_order_email(user[1], total)
self.db.commit()
return {'order_id': cursor.lastrowid, 'total': total}, 201
def _send_order_email(self, email, total):
"""Email integrado en el monolito"""
print(f"📧 Sending order confirmation to {email} for ${total}")
monolith = MonolithEcommerce()
@app.route('/orders', methods=['POST'])
def create_order():
data = request.json
result, status = monolith.create_order(
data['user_id'],
data['product_ids']
)
return jsonify(result), status
# ===== MICROSERVICIOS =====
import requests
from flask import Flask
# Servicio 1: Users
user_service = Flask('users')
users_db = {}
@user_service.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = users_db.get(user_id)
if not user:
return jsonify({'error': 'Not found'}), 404
return jsonify(user), 200
# Servicio 2: Products/Inventory
product_service = Flask('products')
products_db = {
1: {'name': 'Laptop', 'price': 999.99, 'stock': 10},
2: {'name': 'Mouse', 'price': 29.99, 'stock': 50}
}
@product_service.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
product = products_db.get(product_id)
if not product:
return jsonify({'error': 'Not found'}), 404
return jsonify(product), 200
@product_service.route('/products/<int:product_id>/reserve', methods=['POST'])
def reserve_stock(product_id):
"""Reservar stock (inventory management)"""
product = products_db.get(product_id)
if not product or product['stock'] < 1:
return jsonify({'error': 'Out of stock'}), 400
product['stock'] -= 1
return jsonify({'reserved': True}), 200
# Servicio 3: Orders (Orquestador)
order_service = Flask('orders')
orders_db = {}
class OrderOrchestrator:
"""Coordina llamadas entre microservicios"""
USER_SERVICE_URL = "http://localhost:5001"
PRODUCT_SERVICE_URL = "http://localhost:5002"
NOTIFICATION_SERVICE_URL = "http://localhost:5003"
def create_order(self, user_id, product_ids):
"""Flujo distribuido con compensaciones"""
try:
# 1. Validar usuario (llamada a user-service)
user_response = requests.get(
f"{self.USER_SERVICE_URL}/users/{user_id}",
timeout=2
)
if user_response.status_code != 200:
return {'error': 'Invalid user'}, 400
user = user_response.json()
# 2. Reservar productos (llamadas a product-service)
total = 0
reserved_products = []
for product_id in product_ids:
# Obtener producto
product_response = requests.get(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}",
timeout=2
)
if product_response.status_code != 200:
# Compensar: liberar productos ya reservados
self._rollback_reservations(reserved_products)
return {'error': f'Product {product_id} not found'}, 400
product = product_response.json()
total += product['price']
# Reservar stock
reserve_response = requests.post(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/reserve",
timeout=2
)
if reserve_response.status_code != 200:
# Compensar
self._rollback_reservations(reserved_products)
return {'error': f'Product {product_id} out of stock'}, 400
reserved_products.append(product_id)
# 3. Crear orden
order_id = len(orders_db) + 1
orders_db[order_id] = {
'id': order_id,
'user_id': user_id,
'products': product_ids,
'total': total,
'status': 'confirmed',
'created_at': datetime.now().isoformat()
}
# 4. Notificar (async, fire-and-forget)
try:
requests.post(
f"{self.NOTIFICATION_SERVICE_URL}/send",
json={
'email': user['email'],
'template': 'order_confirmation',
'data': {'order_id': order_id, 'total': total}
},
timeout=1
)
except:
# No fallar la orden si falla el email
print("⚠️ Notification failed, but order created")
return {'order_id': order_id, 'total': total}, 201
except requests.Timeout:
return {'error': 'Service timeout'}, 504
except requests.ConnectionError:
return {'error': 'Service unavailable'}, 503
def _rollback_reservations(self, product_ids):
"""Saga pattern: Compensar transacciones"""
for product_id in product_ids:
try:
requests.post(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/release",
timeout=1
)
except:
print(f"⚠️ Failed to rollback product {product_id}")
orchestrator = OrderOrchestrator()
@order_service.route('/orders', methods=['POST'])
def create_order_micro():
data = request.json
result, status = orchestrator.create_order(
data['user_id'],
data['product_ids']
)
return jsonify(result), status
# Servicio 4: Notifications (async)
notification_service = Flask('notifications')
@notification_service.route('/send', methods=['POST'])
def send_notification():
"""Servicio independiente de notificaciones"""
data = request.json
print(f"📧 Sending {data['template']} to {data['email']}")
# Aquí iría integración con SendGrid, SES, etc.
return jsonify({'sent': True}), 200
Cuándo usar Monolito:
- ✅ Startup/MVP (velocidad de desarrollo)
- ✅ Equipo pequeño (<10 devs)
- ✅ Dominio bien definido y estable
- ✅ Tráfico bajo/medio (<10k users)
- ✅ Sin necesidad de escalar componentes independientemente
Cuándo usar Microservicios:
- ✅ Equipos grandes (múltiples equipos)
- ✅ Necesidad de escalar componentes independientemente
- ✅ Diferentes tecnologías por componente
- ✅ Deployment independiente crítico
- ✅ Alta disponibilidad (fallas aisladas)
⚠️ La regla de oro:
“Start with a monolith, evolve to microservices when pain points are clear”
— Martin Fowler
Receta 2.2: Event-Driven Architecture
¿Qué es? Arquitectura donde componentes se comunican mediante eventos (mensajes) en lugar de llamadas síncronas directas.
Ventajas:
- ✅ Desacoplamiento total entre servicios
- ✅ Escalabilidad independiente
- ✅ Resiliencia (servicios pueden estar down temporalmente)
- ✅ Auditoría completa (event log)
Componentes clave:
from dataclasses import dataclass
from datetime import datetime
from typing import List, Callable
import json
from queue import Queue
import threading
# ===== Event Bus =====
@dataclass
class Event:
"""Evento inmutable"""
event_type: str
aggregate_id: str
data: dict
timestamp: datetime
metadata: dict = None
def to_json(self):
return {
'event_type': self.event_type,
'aggregate_id': self.aggregate_id,
'data': self.data,
'timestamp': self.timestamp.isoformat(),
'metadata': self.metadata or {}
}
class EventBus:
"""Bus de eventos in-memory (en producción: Kafka, RabbitMQ, SQS)"""
def __init__(self):
self.subscribers = {} # {event_type: [handlers]}
self.event_log = [] # Store de todos los eventos
def publish(self, event: Event):
"""Publicar evento"""
print(f"📢 Published: {event.event_type}")
self.event_log.append(event)
# Notificar a suscriptores
handlers = self.subscribers.get(event.event_type, [])
for handler in handlers:
try:
# En producción: esto iría a una cola (async)
threading.Thread(target=handler, args=(event,)).start()
except Exception as e:
print(f"❌ Handler error: {e}")
def subscribe(self, event_type: str, handler: Callable):
"""Suscribirse a tipo de evento"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
print(f"✓ Subscribed to {event_type}")
def replay_events(self, aggregate_id: str):
"""Reconstruir estado desde eventos (Event Sourcing)"""
return [e for e in self.event_log if e.aggregate_id == aggregate_id]
# ===== Ejemplo: Sistema de pedidos =====
event_bus = EventBus()
# Servicio 1: Order Service (publica eventos)
class OrderService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.orders = {}
def create_order(self, order_id: str, user_id: str, items: List[dict]):
"""Crear orden y publicar evento"""
order = {
'id': order_id,
'user_id': user_id,
'items': items,
'status': 'created',
'total': sum(item['price'] * item['qty'] for item in items)
}
self.orders[order_id] = order
# Publicar evento (no llamar otros servicios directamente)
self.event_bus.publish(Event(
event_type='OrderCreated',
aggregate_id=order_id,
data=order,
timestamp=datetime.now(),
metadata={'user_id': user_id}
))
return order
def confirm_payment(self, order_id: str):
"""Confirmar pago"""
order = self.orders[order_id]
order['status'] = 'paid'
self.event_bus.publish(Event(
event_type='OrderPaid',
aggregate_id=order_id,
data={'order_id': order_id, 'total': order['total']},
timestamp=datetime.now()
))
# Servicio 2: Inventory Service (reacciona a eventos)
class InventoryService:
def __init__(self, event_bus: EventBus):
self.inventory = {
'laptop': 10,
'mouse': 50
}
# Suscribirse a eventos relevantes
event_bus.subscribe('OrderCreated', self.handle_order_created)
def handle_order_created(self, event: Event):
"""Reservar stock cuando se crea orden"""
print(f"\n🏪 InventoryService processing {event.event_type}")
items = event.data['items']
order_id = event.aggregate_id
# Validar y reservar stock
for item in items:
product = item['product']
qty = item['qty']
if self.inventory.get(product, 0) < qty:
# Publicar evento de fallo
event_bus.publish(Event(
event_type='InventoryReservationFailed',
aggregate_id=order_id,
data={'product': product, 'requested': qty},
timestamp=datetime.now()
))
return
self.inventory[product] -= qty
# Éxito: publicar evento
event_bus.publish(Event(
event_type='InventoryReserved',
aggregate_id=order_id,
data={'items': items},
timestamp=datetime.now()
))
# Servicio 3: Payment Service
class PaymentService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe('InventoryReserved', self.handle_inventory_reserved)
def handle_inventory_reserved(self, event: Event):
"""Procesar pago cuando stock está reservado"""
print(f"\n💳 PaymentService processing {event.event_type}")
order_id = event.aggregate_id
# Simular procesamiento de pago
success = True # En realidad llamaría a Stripe, etc.
if success:
event_bus.publish(Event(
event_type='PaymentSucceeded',
aggregate_id=order_id,
data={'payment_id': 'pay_123'},
timestamp=datetime.now()
))
else:
event_bus.publish(Event(
event_type='PaymentFailed',
aggregate_id=order_id,
data={'reason': 'Insufficient funds'},
timestamp=datetime.now()
))
# Servicio 4: Notification Service
class NotificationService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe('OrderCreated', self.send_order_confirmation)
event_bus.subscribe('PaymentSucceeded', self.send_payment_confirmation)
event_bus.subscribe('InventoryReservationFailed', self.send_failure_notification)
def send_order_confirmation(self, event: Event):
print(f"\n📧 Sending order confirmation for {event.aggregate_id}")
def send_payment_confirmation(self, event: Event):
print(f"\n📧 Sending payment confirmation for {event.aggregate_id}")
def send_failure_notification(self, event: Event):
print(f"\n📧 Sending failure notification for {event.aggregate_id}")
# ===== Demostración =====
print("=== Event-Driven Architecture Demo ===\n")
# Inicializar servicios
order_service = OrderService(event_bus)
inventory_service = InventoryService(event_bus)
payment_service = PaymentService(event_bus)
notification_service = NotificationService(event_bus)
# Crear orden (esto desencadena toda la cadena)
import time
order = order_service.create_order(
order_id='order_001',
user_id='user_123',
items=[
{'product': 'laptop', 'price': 999, 'qty': 1},
{'product': 'mouse', 'price': 29, 'qty': 2}
]
)
# Esperar a que se procesen los eventos (async)
time.sleep(0.5)
print("\n=== Event Log ===")
for event in event_bus.event_log:
print(f"- {event.event_type} @ {event.timestamp.strftime('%H:%M:%S')}")
# Reconstruir estado desde eventos (Event Sourcing)
print("\n=== Replay Events for order_001 ===")
events = event_bus.replay_events('order_001')
for event in events:
print(f"- {event.event_type}: {event.data}")
Patrones de mensajería:
# 1. PUBLISH-SUBSCRIBE (1:N)
# Un evento, múltiples consumidores
event_bus.subscribe('UserRegistered', send_welcome_email)
event_bus.subscribe('UserRegistered', create_default_preferences)
event_bus.subscribe('UserRegistered', send_to_analytics)
# 2. POINT-TO-POINT (1:1)
# Task queue: Un mensaje, un consumidor
task_queue = Queue()
task_queue.put({'task': 'send_email', 'to': 'user@example.com'})
# Workers compiten por tareas
def worker():
while True:
task = task_queue.get()
process_task(task)
task_queue.task_done()
# 3. REQUEST-REPLY (síncrono simulado)
# Cliente espera respuesta
def request_with_correlation():
correlation_id = 'req_123'
# Publicar request
event_bus.publish(Event(
event_type='GetUserRequest',
aggregate_id=correlation_id,
data={'user_id': '456'},
timestamp=datetime.now()
))
# Esperar respuesta (normalmente con timeout)
# En producción: usar reply queue
Tecnologías para Event-Driven:
- Message Brokers: RabbitMQ, Apache Kafka, AWS SQS/SNS, Google Pub/Sub
- Event Stores: EventStoreDB, Apache Kafka (también puede ser store)
- Streaming: Apache Kafka, AWS Kinesis, Apache Flink
Receta 2.3: CQRS y Event Sourcing
¿Qué es CQRS? Command Query Responsibility Segregation: Separar modelos de escritura (Commands) y lectura (Queries).
¿Qué es Event Sourcing? Almacenar cambios de estado como secuencia de eventos en lugar del estado actual.
Caso de uso: Sistema bancario
from typing import List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
# ===== EVENT SOURCING =====
class EventType(Enum):
ACCOUNT_CREATED = "account_created"
MONEY_DEPOSITED = "money_deposited"
MONEY_WITHDRAWN = "money_withdrawn"
ACCOUNT_FROZEN = "account_frozen"
@dataclass
class DomainEvent:
"""Evento de dominio inmutable"""
event_id: str
event_type: EventType
aggregate_id: str # Account ID
data: dict
timestamp: datetime = field(default_factory=datetime.now)
version: int = 1
class EventStore:
"""Store de eventos (en producción: EventStoreDB, PostgreSQL)"""
def __init__(self):
self.events = [] # En producción: DB append-only
def append(self, event: DomainEvent):
"""Agregar evento (inmutable, solo append)"""
self.events.append(event)
print(f"✓ Event stored: {event.event_type.value} v{event.version}")
def get_events(self, aggregate_id: str) -> List[DomainEvent]:
"""Obtener todos los eventos de un agregado"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
def get_events_from_version(self, aggregate_id: str, from_version: int):
"""Obtener eventos desde cierta versión (para snapshots)"""
return [
e for e in self.events
if e.aggregate_id == aggregate_id and e.version >= from_version
]
# ===== WRITE MODEL (Commands) =====
class AccountAggregate:
"""Agregado que reconstruye estado desde eventos"""
def __init__(self, account_id: str):
self.account_id = account_id
self.balance = 0
self.status = 'inactive'
self.version = 0
self.uncommitted_events = []
def create_account(self, owner: str, initial_deposit: float):
"""Command: Crear cuenta"""
if self.status != 'inactive':
raise ValueError("Account already exists")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.ACCOUNT_CREATED,
aggregate_id=self.account_id,
data={'owner': owner, 'initial_deposit': initial_deposit},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def deposit(self, amount: float):
"""Command: Depositar"""
if amount <= 0:
raise ValueError("Amount must be positive")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.MONEY_DEPOSITED,
aggregate_id=self.account_id,
data={'amount': amount},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def withdraw(self, amount: float):
"""Command: Retirar"""
if amount <= 0:
raise ValueError("Amount must be positive")
if self.balance < amount:
raise ValueError("Insufficient funds")
if self.status == 'frozen':
raise ValueError("Account is frozen")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.MONEY_WITHDRAWN,
aggregate_id=self.account_id,
data={'amount': amount},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def _apply_event(self, event: DomainEvent):
"""Aplicar evento al estado (Event Sourcing)"""
if event.event_type == EventType.ACCOUNT_CREATED:
self.status = 'active'
self.balance = event.data['initial_deposit']
elif event.event_type == EventType.MONEY_DEPOSITED:
self.balance += event.data['amount']
elif event.event_type == EventType.MONEY_WITHDRAWN:
self.balance -= event.data['amount']
elif event.event_type == EventType.ACCOUNT_FROZEN:
self.status = 'frozen'
self.version = event.version
def load_from_history(self, events: List[DomainEvent]):
"""Reconstruir estado desde eventos (Event Sourcing)"""
for event in events:
self._apply_event(event)
def get_uncommitted_events(self):
"""Obtener eventos pendientes de guardar"""
events = self.uncommitted_events
self.uncommitted_events = []
return events
# ===== READ MODEL (Queries) =====
class AccountReadModel:
"""Modelo optimizado para lecturas (CQRS)"""
def __init__(self):
# Diferentes proyecciones para diferentes queries
self.accounts_by_id = {}
self.accounts_by_balance = [] # Ordenado por balance
self.transaction_history = {} # {account_id: [transactions]}
def project_event(self, event: DomainEvent):
"""Actualizar proyección cuando llega evento"""
account_id = event.aggregate_id
if event.event_type == EventType.ACCOUNT_CREATED:
self.accounts_by_id[account_id] = {
'id': account_id,
'owner': event.data['owner'],
'balance': event.data['initial_deposit'],
'status': 'active',
'created_at': event.timestamp
}
self.transaction_history[account_id] = []
elif event.event_type == EventType.MONEY_DEPOSITED:
self.accounts_by_id[account_id]['balance'] += event.data['amount']
self.transaction_history[account_id].append({
'type': 'deposit',
'amount': event.data['amount'],
'timestamp': event.timestamp,
'balance_after': self.accounts_by_id[account_id]['balance']
})
elif event.event_type == EventType.MONEY_WITHDRAWN:
self.accounts_by_id[account_id]['balance'] -= event.data['amount']
self.transaction_history[account_id].append({
'type': 'withdrawal',
'amount': event.data['amount'],
'timestamp': event.timestamp,
'balance_after': self.accounts_by_id[account_id]['balance']
})
# Actualizar índice ordenado por balance
self._rebuild_balance_index()
def _rebuild_balance_index(self):
"""Reconstruir índice ordenado"""
self.accounts_by_balance = sorted(
self.accounts_by_id.values(),
key=lambda x: x['balance'],
reverse=True
)
def get_account(self, account_id: str):
"""Query: Obtener cuenta por ID - O(1)"""
return self.accounts_by_id.get(account_id)
def get_richest_accounts(self, limit: int = 10):
"""Query: Top cuentas por balance - O(1) por índice pre-calculado"""
return self.accounts_by_balance[:limit]
def get_transaction_history(self, account_id: str):
"""Query: Historial de transacciones - O(1)"""
return self.transaction_history.get(account_id, [])
# ===== COMMAND HANDLER =====
class BankingCommandHandler:
"""Maneja commands y persiste eventos"""
def __init__(self, event_store: EventStore, read_model: AccountReadModel):
self.event_store = event_store
self.read_model = read_model
def handle_create_account(self, account_id: str, owner: str, initial_deposit: float):
"""Command Handler: Crear cuenta"""
# 1. Cargar agregado desde eventos
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
# 2. Ejecutar command
account.create_account(owner, initial_deposit)
# 3. Persistir eventos
for event in account.get_uncommitted_events():
self.event_store.append(event)
# 4. Actualizar read model (eventual consistency)
self.read_model.project_event(event)
def handle_deposit(self, account_id: str, amount: float):
"""Command Handler: Depositar"""
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
account.deposit(amount)
for event in account.get_uncommitted_events():
self.event_store.append(event)
self.read_model.project_event(event)
def handle_withdraw(self, account_id: str, amount: float):
"""Command Handler: Retirar"""
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
account.withdraw(amount)
for event in account.get_uncommitted_events():
self.event_store.append(event)
self.read_model.project_event(event)
# ===== QUERY HANDLER =====
class BankingQueryHandler:
"""Maneja queries (solo lectura)"""
def __init__(self, read_model: AccountReadModel):
self.read_model = read_model
def get_account_balance(self, account_id: str):
"""Query: Balance actual"""
account = self.read_model.get_account(account_id)
return account['balance'] if account else None
def get_account_details(self, account_id: str):
"""Query: Detalles completos"""
return self.read_model.get_account(account_id)
def get_top_balances(self):
"""Query: Top 10 cuentas"""
return self.read_model.get_richest_accounts(10)
def get_transactions(self, account_id: str):
"""Query: Historial"""
return self.read_model.get_transaction_history(account_id)
# ===== DEMOSTRACIÓN =====
print("=== CQRS + Event Sourcing Demo ===\n")
event_store = EventStore()
read_model = AccountReadModel()
command_handler = BankingCommandHandler(event_store, read_model)
query_handler = BankingQueryHandler(read_model)
# COMMANDS (Write)
print("--- COMMANDS ---")
command_handler.handle_create_account('ACC001', 'Alice', 1000.0)
command_handler.handle_deposit('ACC001', 500.0)
command_handler.handle_withdraw('ACC001', 200.0)
command_handler.handle_deposit('ACC001', 1000.0)
command_handler.handle_create_account('ACC002', 'Bob', 5000.0)
command_handler.handle_deposit('ACC002', 2000.0)
# QUERIES (Read)
print("\n--- QUERIES ---")
balance = query_handler.get_account_balance('ACC001')
print(f"Alice's balance: ${balance}")
details = query_handler.get_account_details('ACC001')
print(f"Alice's account: {details}")
top_accounts = query_handler.get_top_balances()
print(f"\nTop accounts:")
for acc in top_accounts:
print(f" {acc['owner']}: ${acc['balance']}")
transactions = query_handler.get_transactions('ACC001')
print(f"\nAlice's transactions:")
for tx in transactions:
print(f" {tx['type']}: ${tx['amount']} -> Balance: ${tx['balance_after']}")
# EVENT SOURCING: Reconstruir estado desde eventos
print("\n--- EVENT SOURCING: Rebuild from history ---")
new_account = AccountAggregate('ACC001')
events = event_store.get_events('ACC001')
new_account.load_from_history(events)
print(f"Rebuilt balance: ${new_account.balance} (version {new_account.version})")
print(f"\nTotal events stored: {len(event_store.events)}")
Ventajas de CQRS + Event Sourcing:
- ✅ Auditoría completa (todos los cambios registrados)
- ✅ Time-travel (reconstruir estado en cualquier punto)
- ✅ Read models optimizados para cada query
- ✅ Escalabilidad (reads y writes separados)
- ✅ Replay de eventos (reprocessing, analytics)
Desventajas:
- ❌ Complejidad alta
- ❌ Eventual consistency
- ❌ Storage crece constantemente (mitigation: snapshots)
- ❌ Difícil de debuggear
Receta 2.4: CAP Theorem y Consistencia Eventual
CAP Theorem: En un sistema distribuido, solo puedes garantizar 2 de 3:
- Consistency: Todos ven los mismos datos al mismo tiempo
- Availability: Sistema siempre responde (aunque sea con datos viejos)
- Partition tolerance: Sistema funciona aunque haya fallos de red
Trade-offs:
| Sistema | Prioriza | Ejemplo |
|---|---|---|
| CP | Consistencia + Particiones | Bancos tradicionales, Zookeeper |
| AP | Disponibilidad + Particiones | DNS, Cassandra, DynamoDB |
| CA | Consistencia + Disponibilidad | RDBMS single-node (no distribuido) |
Implementación de consistencia eventual:
import time
import threading
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class WriteOperation:
"""Operación de escritura con timestamp"""
key: str
value: any
timestamp: float
node_id: str
class EventuallyConsistentStore:
"""Distributed key-value store con consistencia eventual (AP)"""
def __init__(self, node_id: str, replication_factor: int = 3):
self.node_id = node_id
self.replication_factor = replication_factor
self.data: Dict[str, any] = {}
self.vector_clock: Dict[str, int] = {} # Lamport clock
self.write_log: List[WriteOperation] = []
self.peer_nodes = []
self.lock = threading.Lock()
def connect_to_peer(self, peer_node):
"""Conectar a otro nodo (cluster)"""
self.peer_nodes.append(peer_node)
print(f"✓ {self.node_id} connected to {peer_node.node_id}")
def write(self, key: str, value: any):
"""Escribir con consistencia eventual"""
with self.lock:
# 1. Escribir localmente (disponibilidad)
self.data[key] = value
self.vector_clock[key] = self.vector_clock.get(key, 0) + 1
operation = WriteOperation(
key=key,
value=value,
timestamp=time.time(),
node_id=self.node_id
)
self.write_log.append(operation)
print(f"✓ {self.node_id}: Wrote {key}={value} locally")
# 2. Replicar asíncronamente (eventual consistency)
self._replicate_async(operation)
return True
def _replicate_async(self, operation: WriteOperation):
"""Replicar a otros nodos asíncronamente"""
def replicate():
time.sleep(0.1) # Simular network delay
for peer in self.peer_nodes[:self.replication_factor - 1]:
try:
peer._receive_replication(operation)
except Exception as e:
print(f"⚠️ Replication failed to {peer.node_id}: {e}")
threading.Thread(target=replicate, daemon=True).start()
def _receive_replication(self, operation: WriteOperation):
"""Recibir replicación de otro nodo"""
with self.lock:
current_version = self.vector_clock.get(operation.key, 0)
# Resolver conflictos: Last-Write-Wins usando timestamp
if operation.key not in self.data or operation.timestamp > self.write_log[-1].timestamp:
self.data[operation.key] = operation.value
self.vector_clock[operation.key] = current_version + 1
print(f"✓ {self.node_id}: Replicated {operation.key}={operation.value} from {operation.node_id}")
def read(self, key: str, consistency_level='ONE'):
"""
Leer con diferentes niveles de consistencia
ONE: Leer de un nodo (más rápido, puede ser stale)
QUORUM: Leer de mayoría (balance)
ALL: Leer de todos (más consistente, más lento)
"""
if consistency_level == 'ONE':
# Lectura local (rápida pero eventual)
value = self.data.get(key)
print(f"🔍 {self.node_id}: Read {key}={value} (ONE)")
return value
elif consistency_level == 'QUORUM':
# Leer de mayoría de nodos
quorum_size = (len(self.peer_nodes) + 1) // 2 + 1
responses = [self.data.get(key)]
for peer in self.peer_nodes[:quorum_size - 1]:
responses.append(peer.data.get(key))
# Resolver: tomar valor más común
value = max(set(responses), key=responses.count)
print(f"🔍 {self.node_id}: Read {key}={value} (QUORUM)")
return value
elif consistency_level == 'ALL':
# Leer de todos (strong consistency)
responses = [self.data.get(key)]
for peer in self.peer_nodes:
responses.append(peer.data.get(key))
# Verificar que todos tienen el mismo valor
if len(set(responses)) > 1:
print(f"⚠️ Inconsistency detected for {key}: {set(responses)}")
value = responses[0]
print(f"🔍 {self.node_id}: Read {key}={value} (ALL)")
return value
def get_stats(self):
"""Estadísticas del nodo"""
return {
'node_id': self.node_id,
'keys': len(self.data),
'writes': len(self.write_log),
'data': dict(self.data)
}
# ===== DEMOSTRACIÓN =====
print("=== Eventual Consistency Demo ===\n")
# Crear cluster de 3 nodos
node1 = EventuallyConsistentStore('Node1')
node2 = EventuallyConsistentStore('Node2')
node3 = EventuallyConsistentStore('Node3')
# Conectar nodos en cluster
node1.connect_to_peer(node2)
node1.connect_to_peer(node3)
node2.connect_to_peer(node1)
node2.connect_to_peer(node3)
node3.connect_to_peer(node1)
node3.connect_to_peer(node2)
print("\n--- Scenario 1: Normal Operation ---")
node1.write('user:123', {'name': 'Alice', 'age': 30})
time.sleep(0.3) # Esperar replicación
# Leer desde diferentes nodos
node1.read('user:123', 'ONE')
node2.read('user:123', 'ONE') # Eventualmente consistente
node3.read('user:123', 'ONE')
print("\n--- Scenario 2: Read Before Replication ---")
node1.write('user:456', {'name': 'Bob', 'age': 25})
node2.read('user:456', 'ONE') # Puede no estar disponible aún
time.sleep(0.3)
node2.read('user:456', 'ONE') # Ahora sí está replicado
print("\n--- Scenario 3: Conflicting Writes ---")
# Escrituras concurrentes en diferentes nodos
node1.write('user:789', {'name': 'Charlie', 'version': 1})
node2.write('user:789', {'name': 'Charlie Updated', 'version': 2})
time.sleep(0.5) # Esperar convergencia
# Last-Write-Wins: el valor más reciente gana
print("\nAfter convergence:")
for node in [node1, node2, node3]:
value = node.read('user:789', 'ONE')
print("\n--- Node Stats ---")
for node in [node1, node2, node3]:
print(f"{node.node_id}: {node.get_stats()}")
Estrategias de resolución de conflictos:
class ConflictResolution:
"""Diferentes estrategias para resolver conflictos"""
@staticmethod
def last_write_wins(values_with_timestamps):
"""LWW: El valor con timestamp más reciente gana"""
return max(values_with_timestamps, key=lambda x: x['timestamp'])
@staticmethod
def first_write_wins(values_with_timestamps):
"""FWW: El primer valor gana"""
return min(values_with_timestamps, key=lambda x: x['timestamp'])
@staticmethod
def merge_sets(values):
"""Para sets: union de todos los valores"""
result = set()
for value in values:
result.update(value)
return result
@staticmethod
def custom_merge(values, merge_function):
"""Merge custom definido por la aplicación"""
return merge_function(values)
@staticmethod
def vector_clock_resolution(values_with_clocks):
"""Usar vector clocks para detectar causalidad"""
# Implementación simplificada
for v1 in values_with_clocks:
is_dominated = False
for v2 in values_with_clocks:
if v1 != v2 and all(v1['clock'][k] <= v2['clock'].get(k, 0) for k in v1['clock']):
is_dominated = True
break
if not is_dominated:
return v1 # Este valor no es dominado por ningún otro
return values_with_clocks[0] # Conflicto real, usar estrategia de fallback
# Ejemplo: Shopping cart con merge
class ShoppingCart:
"""Carrito de compras con consistencia eventual"""
@staticmethod
def merge_carts(cart1: set, cart2: set):
"""Merge de carritos: union de items"""
# ADD WINS semántica: nunca perdemos items
return cart1.union(cart2)
cart_node1 = {'item1', 'item2'}
cart_node2 = {'item2', 'item3'}
merged = ShoppingCart.merge_carts(cart_node1, cart_node2)
print(f"Merged cart: {merged}") # {'item1', 'item2', 'item3'}
Cuándo usar consistencia eventual:
- ✅ Redes sociales (likes, follows pueden estar desfasados)
- ✅ Carritos de compra (mejor agregar de más que perder items)
- ✅ Feeds de noticias
- ✅ Sistemas de recomendación
- ✅ CDNs (contenido estático)
Cuándo NO usar:
- ❌ Transacciones financieras
- ❌ Reservas de inventario
- ❌ Sistemas de votación
- ❌ Gestión de permisos críticos
Receta 2.5: Load Balancing y Caching
Load Balancing: Distribuir tráfico entre servidores
from typing import List
from dataclasses import dataclass
import hashlib
import random
@dataclass
class Server:
"""Servidor backend"""
id: str
host: str
port: int
weight: int = 1 # Para weighted round-robin
health: str = 'healthy' # healthy, unhealthy
active_connections: int = 0
class LoadBalancer:
"""Load balancer con múltiples algoritmos"""
def __init__(self, servers: List[Server]):
self.servers = servers
self.round_robin_index = 0
self.request_count = 0
def get_server_round_robin(self) -> Server:
"""Round Robin: Rotar entre servidores equitativamente"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
server = healthy_servers[self.round_robin_index % len(healthy_servers)]
self.round_robin_index += 1
return server
def get_server_weighted_round_robin(self) -> Server:
"""Weighted Round Robin: Considerar capacidad del servidor"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
# Crear lista con repeticiones basadas en peso
weighted_servers = []
for server in healthy_servers:
weighted_servers.extend([server] * server.weight)
server = weighted_servers[self.round_robin_index % len(weighted_servers)]
self.round_robin_index += 1
return server
def get_server_least_connections(self) -> Server:
"""Least Connections: Servidor con menos conexiones activas"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
return min(healthy_servers, key=lambda s: s.active_connections)
def get_server_ip_hash(self, client_ip: str) -> Server:
"""IP Hash: Mismo cliente siempre al mismo servidor (sticky sessions)"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
# Hash consistente del IP
hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
index = hash_value % len(healthy_servers)
return healthy_servers[index]
def get_server_random(self) -> Server:
"""Random: Servidor aleatorio"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
return random.choice(healthy_servers)
def health_check(self):
"""Health check de servidores"""
for server in self.servers:
# En producción: HTTP health check
# Simulación: random failure
if random.random() < 0.1: # 10% chance de fallo
server.health = 'unhealthy'
print(f"⚠️ {server.id} is unhealthy")
else:
server.health = 'healthy'
# ===== CACHING STRATEGIES =====
import time
from collections import OrderedDict
class LRUCache:
"""Least Recently Used Cache"""
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
self.hits = 0
self.misses = 0
def get(self, key: str):
"""Obtener del cache"""
if key in self.cache:
# Mover al final (más recientemente usado)
self.cache.move_to_end(key)
self.hits += 1
return self.cache[key]
self.misses += 1
return None
def put(self, key: str, value):
"""Agregar al cache"""
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.capacity:
# Eliminar el menos recientemente usado (primero)
self.cache.popitem(last=False)
self.cache[key] = value
def get_stats(self):
"""Estadísticas de cache"""
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': f'{hit_rate:.1f}%',
'size': len(self.cache)
}
class CacheAside:
"""Cache-Aside pattern (Lazy Loading)"""
def __init__(self, cache: LRUCache, database):
self.cache = cache
self.database = database
def get(self, key: str):
"""
1. Buscar en cache
2. Si no está, buscar en DB
3. Guardar en cache
"""
# 1. Buscar en cache
value = self.cache.get(key)
if value is not None:
print(f"✓ Cache HIT for {key}")
return value
# 2. Cache MISS: buscar en DB
print(f"✗ Cache MISS for {key}, fetching from DB...")
time.sleep(0.1) # Simular latencia de DB
value = self.database.get(key)
# 3. Guardar en cache
if value is not None:
self.cache.put(key, value)
return value
def update(self, key: str, value):
"""
Write-Through: Actualizar DB y cache
"""
# 1. Actualizar DB
self.database[key] = value
# 2. Actualizar cache
self.cache.put(key, value)
class WriteBackCache:
"""Write-Back pattern: escribir a DB asíncronamente"""
def __init__(self, cache: LRUCache, database, flush_interval=5):
self.cache = cache
self.database = database
self.dirty_keys = set()
self.flush_interval = flush_interval
# Background flusher
import threading
self.flusher_thread = threading.Thread(target=self._flush_periodically, daemon=True)
self.flusher_thread.start()
def write(self, key: str, value):
"""Escribir en cache (lazy DB write)"""
self.cache.put(key, value)
self.dirty_keys.add(key)
print(f"✓ Wrote {key} to cache (DB write pending)")
def _flush_periodically(self):
"""Flush cache a DB periódicamente"""
while True:
time.sleep(self.flush_interval)
self._flush()
def _flush(self):
"""Flush dirty keys a DB"""
if self.dirty_keys:
print(f"\n🔄 Flushing {len(self.dirty_keys)} keys to DB...")
for key in list(self.dirty_keys):
value = self.cache.get(key)
if value is not None:
self.database[key] = value
self.dirty_keys.clear()
print("✓ Flush complete")
# ===== CACHE INVALIDATION =====
class CacheInvalidation:
"""Estrategias de invalidación de cache"""
@staticmethod
def ttl_based(cache_entry, ttl_seconds=300):
"""Time-To-Live: invalidar después de N segundos"""
age = time.time() - cache_entry['timestamp']
return age > ttl_seconds
@staticmethod
def event_based(cache, event_type, affected_keys):
"""Event-based: invalidar cuando cambian datos"""
for key in affected_keys:
cache.cache.pop(key, None)
print(f"✓ Invalidated {len(affected_keys)} keys due to {event_type}")
@staticmethod
def tag_based(cache, tags):
"""Tag-based: invalidar por categorías"""
# Cache entries con tags: {'key': 'user:123', 'tags': ['users', 'profile']}
# Invalidar todos los entries con tag 'users'
pass
# ===== DEMOSTRACIÓN =====
print("=== Load Balancing Demo ===\n")
servers = [
Server('server1', '10.0.0.1', 8080, weight=3),
Server('server2', '10.0.0.2', 8080, weight=2),
Server('server3', '10.0.0.3', 8080, weight=1),
]
lb = LoadBalancer(servers)
# Round Robin
print("--- Round Robin ---")
for i in range(6):
server = lb.get_server_round_robin()
print(f"Request {i+1} → {server.id}")
# Weighted Round Robin
print("\n--- Weighted Round Robin ---")
lb.round_robin_index = 0
for i in range(6):
server = lb.get_server_weighted_round_robin()
print(f"Request {i+1} → {server.id}")
# IP Hash (sticky sessions)
print("\n--- IP Hash (Sticky Sessions) ---")
clients = ['192.168.1.1', '192.168.1.2', '192.168.1.1', '192.168.1.3', '192.168.1.1']
for i, client_ip in enumerate(clients):
server = lb.get_server_ip_hash(client_ip)
print(f"Request {i+1} from {client_ip} → {server.id}")
print("\n=== Caching Demo ===\n")
# Setup
database = {
'user:1': {'name': 'Alice', 'age': 30},
'user:2': {'name': 'Bob', 'age': 25},
'user:3': {'name': 'Charlie', 'age': 35}
}
cache = LRUCache(capacity=2)
cache_aside = CacheAside(cache, database)
# Cache-Aside pattern
print("--- Cache-Aside Pattern ---")
cache_aside.get('user:1') # MISS
cache_aside.get('user:1') # HIT
cache_aside.get('user:2') # MISS
cache_aside.get('user:3') # MISS (evicts user:1)
cache_aside.get('user:1') # MISS again
print(f"\nCache stats: {cache.get_stats()}")
# Write-Back pattern
print("\n--- Write-Back Pattern ---")
wb_cache = WriteBackCache(LRUCache(10), {})
wb_cache.write('config:theme', 'dark')
wb_cache.write('config:language', 'es')
print("Waiting for flush...")
time.sleep(6)
print(f"DB contents: {wb_cache.database}")
Estrategias de caching por capa:
| Capa | Ejemplo | TTL típico |
|---|---|---|
| CDN | Cloudflare, Akamai | Horas/Días |
| Application | Redis, Memcached | Minutos/Horas |
| Database | Query cache | Segundos/Minutos |
| Browser | Local storage | Días/Semanas |