🏗️ Developer Cookbook - FASE 2: Arquitectura y Diseño de Sistemas
Recetas prácticas para diseñar sistemas escalables y robustos
📚 Tabla de Contenidos
Diseño de Software Escalable
Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?
¿Qué son?
- Monolito: Aplicación única donde todos los componentes están acoplados
- Microservicios: Aplicación dividida en servicios pequeños e independientes
Comparación:
| Característica | Monolito | Microservicios |
|---|---|---|
| Complejidad inicial | 🟢 Baja | 🔴 Alta |
| Deployment | Simple (1 deploy) | Complejo (N deploys) |
| Escalabilidad | Vertical solamente | Horizontal por servicio |
| Testing | 🟢 Fácil E2E | 🔴 Difícil E2E |
| Desarrollo inicial | 🟢 Rápido | 🔴 Lento |
| Gestión | Simple | Requiere DevOps/SRE |
| Debugging | 🟢 Fácil | 🔴 Trazas distribuidas |
| Tecnología | Stack único | Poliglota |
Caso de uso: eCommerce
# ===== MONOLITO =====
from flask import Flask, request, jsonify
from datetime import datetime
import sqlite3
app = Flask(__name__)
class MonolithEcommerce:
"""Todo en una aplicación"""
def __init__(self):
self.db = sqlite3.connect('ecommerce.db', check_same_thread=False)
self._init_db()
def _init_db(self):
"""Todas las tablas en una DB"""
cursor = self.db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
email TEXT,
name TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL,
stock INTEGER
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
user_id INTEGER,
total REAL,
status TEXT,
created_at TEXT
)
''')
self.db.commit()
def create_order(self, user_id, product_ids):
"""Todo el flujo de orden en un lugar"""
cursor = self.db.cursor()
# 1. Validar usuario
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
if not user:
return {'error': 'User not found'}, 404
# 2. Calcular total y validar stock
total = 0
for product_id in product_ids:
cursor.execute('SELECT price, stock FROM products WHERE id = ?', (product_id,))
product = cursor.fetchone()
if not product or product[1] < 1:
return {'error': f'Product {product_id} unavailable'}, 400
total += product[0]
# 3. Crear orden
cursor.execute('''
INSERT INTO orders (user_id, total, status, created_at)
VALUES (?, ?, ?, ?)
''', (user_id, total, 'pending', datetime.now().isoformat()))
# 4. Reducir stock
for product_id in product_ids:
cursor.execute('''
UPDATE products SET stock = stock - 1 WHERE id = ?
''', (product_id,))
# 5. Enviar email (simulado)
self._send_order_email(user[1], total)
self.db.commit()
return {'order_id': cursor.lastrowid, 'total': total}, 201
def _send_order_email(self, email, total):
"""Email integrado en el monolito"""
print(f"📧 Sending order confirmation to {email} for ${total}")
monolith = MonolithEcommerce()
@app.route('/orders', methods=['POST'])
def create_order():
data = request.json
result, status = monolith.create_order(
data['user_id'],
data['product_ids']
)
return jsonify(result), status
# ===== MICROSERVICIOS =====
import requests
from flask import Flask
# Servicio 1: Users
user_service = Flask('users')
users_db = {}
@user_service.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = users_db.get(user_id)
if not user:
return jsonify({'error': 'Not found'}), 404
return jsonify(user), 200
# Servicio 2: Products/Inventory
product_service = Flask('products')
products_db = {
1: {'name': 'Laptop', 'price': 999.99, 'stock': 10},
2: {'name': 'Mouse', 'price': 29.99, 'stock': 50}
}
@product_service.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
product = products_db.get(product_id)
if not product:
return jsonify({'error': 'Not found'}), 404
return jsonify(product), 200
@product_service.route('/products/<int:product_id>/reserve', methods=['POST'])
def reserve_stock(product_id):
"""Reservar stock (inventory management)"""
product = products_db.get(product_id)
if not product or product['stock'] < 1:
return jsonify({'error': 'Out of stock'}), 400
product['stock'] -= 1
return jsonify({'reserved': True}), 200
# Servicio 3: Orders (Orquestador)
order_service = Flask('orders')
orders_db = {}
class OrderOrchestrator:
"""Coordina llamadas entre microservicios"""
USER_SERVICE_URL = "http://localhost:5001"
PRODUCT_SERVICE_URL = "http://localhost:5002"
NOTIFICATION_SERVICE_URL = "http://localhost:5003"
def create_order(self, user_id, product_ids):
"""Flujo distribuido con compensaciones"""
try:
# 1. Validar usuario (llamada a user-service)
user_response = requests.get(
f"{self.USER_SERVICE_URL}/users/{user_id}",
timeout=2
)
if user_response.status_code != 200:
return {'error': 'Invalid user'}, 400
user = user_response.json()
# 2. Reservar productos (llamadas a product-service)
total = 0
reserved_products = []
for product_id in product_ids:
# Obtener producto
product_response = requests.get(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}",
timeout=2
)
if product_response.status_code != 200:
# Compensar: liberar productos ya reservados
self._rollback_reservations(reserved_products)
return {'error': f'Product {product_id} not found'}, 400
product = product_response.json()
total += product['price']
# Reservar stock
reserve_response = requests.post(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/reserve",
timeout=2
)
if reserve_response.status_code != 200:
# Compensar
self._rollback_reservations(reserved_products)
return {'error': f'Product {product_id} out of stock'}, 400
reserved_products.append(product_id)
# 3. Crear orden
order_id = len(orders_db) + 1
orders_db[order_id] = {
'id': order_id,
'user_id': user_id,
'products': product_ids,
'total': total,
'status': 'confirmed',
'created_at': datetime.now().isoformat()
}
# 4. Notificar (async, fire-and-forget)
try:
requests.post(
f"{self.NOTIFICATION_SERVICE_URL}/send",
json={
'email': user['email'],
'template': 'order_confirmation',
'data': {'order_id': order_id, 'total': total}
},
timeout=1
)
except:
# No fallar la orden si falla el email
print("⚠️ Notification failed, but order created")
return {'order_id': order_id, 'total': total}, 201
except requests.Timeout:
return {'error': 'Service timeout'}, 504
except requests.ConnectionError:
return {'error': 'Service unavailable'}, 503
def _rollback_reservations(self, product_ids):
"""Saga pattern: Compensar transacciones"""
for product_id in product_ids:
try:
requests.post(
f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/release",
timeout=1
)
except:
print(f"⚠️ Failed to rollback product {product_id}")
orchestrator = OrderOrchestrator()
@order_service.route('/orders', methods=['POST'])
def create_order_micro():
data = request.json
result, status = orchestrator.create_order(
data['user_id'],
data['product_ids']
)
return jsonify(result), status
# Servicio 4: Notifications (async)
notification_service = Flask('notifications')
@notification_service.route('/send', methods=['POST'])
def send_notification():
"""Servicio independiente de notificaciones"""
data = request.json
print(f"📧 Sending {data['template']} to {data['email']}")
# Aquí iría integración con SendGrid, SES, etc.
return jsonify({'sent': True}), 200
Cuándo usar Monolito:
- ✅ Startup/MVP (velocidad de desarrollo)
- ✅ Equipo pequeño (<10 devs)
- ✅ Dominio bien definido y estable
- ✅ Tráfico bajo/medio (<10k users)
- ✅ Sin necesidad de escalar componentes independientemente
Cuándo usar Microservicios:
- ✅ Equipos grandes (múltiples equipos)
- ✅ Necesidad de escalar componentes independientemente
- ✅ Diferentes tecnologías por componente
- ✅ Deployment independiente crítico
- ✅ Alta disponibilidad (fallas aisladas)
⚠️ La regla de oro:
“Start with a monolith, evolve to microservices when pain points are clear”
— Martin Fowler
Receta 2.2: Event-Driven Architecture
¿Qué es? Arquitectura donde componentes se comunican mediante eventos (mensajes) en lugar de llamadas síncronas directas.
Ventajas:
- ✅ Desacoplamiento total entre servicios
- ✅ Escalabilidad independiente
- ✅ Resiliencia (servicios pueden estar down temporalmente)
- ✅ Auditoría completa (event log)
Componentes clave:
from dataclasses import dataclass
from datetime import datetime
from typing import List, Callable
import json
from queue import Queue
import threading
# ===== Event Bus =====
@dataclass
class Event:
"""Evento inmutable"""
event_type: str
aggregate_id: str
data: dict
timestamp: datetime
metadata: dict = None
def to_json(self):
return {
'event_type': self.event_type,
'aggregate_id': self.aggregate_id,
'data': self.data,
'timestamp': self.timestamp.isoformat(),
'metadata': self.metadata or {}
}
class EventBus:
"""Bus de eventos in-memory (en producción: Kafka, RabbitMQ, SQS)"""
def __init__(self):
self.subscribers = {} # {event_type: [handlers]}
self.event_log = [] # Store de todos los eventos
def publish(self, event: Event):
"""Publicar evento"""
print(f"📢 Published: {event.event_type}")
self.event_log.append(event)
# Notificar a suscriptores
handlers = self.subscribers.get(event.event_type, [])
for handler in handlers:
try:
# En producción: esto iría a una cola (async)
threading.Thread(target=handler, args=(event,)).start()
except Exception as e:
print(f"❌ Handler error: {e}")
def subscribe(self, event_type: str, handler: Callable):
"""Suscribirse a tipo de evento"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
print(f"✓ Subscribed to {event_type}")
def replay_events(self, aggregate_id: str):
"""Reconstruir estado desde eventos (Event Sourcing)"""
return [e for e in self.event_log if e.aggregate_id == aggregate_id]
# ===== Ejemplo: Sistema de pedidos =====
event_bus = EventBus()
# Servicio 1: Order Service (publica eventos)
class OrderService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.orders = {}
def create_order(self, order_id: str, user_id: str, items: List[dict]):
"""Crear orden y publicar evento"""
order = {
'id': order_id,
'user_id': user_id,
'items': items,
'status': 'created',
'total': sum(item['price'] * item['qty'] for item in items)
}
self.orders[order_id] = order
# Publicar evento (no llamar otros servicios directamente)
self.event_bus.publish(Event(
event_type='OrderCreated',
aggregate_id=order_id,
data=order,
timestamp=datetime.now(),
metadata={'user_id': user_id}
))
return order
def confirm_payment(self, order_id: str):
"""Confirmar pago"""
order = self.orders[order_id]
order['status'] = 'paid'
self.event_bus.publish(Event(
event_type='OrderPaid',
aggregate_id=order_id,
data={'order_id': order_id, 'total': order['total']},
timestamp=datetime.now()
))
# Servicio 2: Inventory Service (reacciona a eventos)
class InventoryService:
def __init__(self, event_bus: EventBus):
self.inventory = {
'laptop': 10,
'mouse': 50
}
# Suscribirse a eventos relevantes
event_bus.subscribe('OrderCreated', self.handle_order_created)
def handle_order_created(self, event: Event):
"""Reservar stock cuando se crea orden"""
print(f"\n🏪 InventoryService processing {event.event_type}")
items = event.data['items']
order_id = event.aggregate_id
# Validar y reservar stock
for item in items:
product = item['product']
qty = item['qty']
if self.inventory.get(product, 0) < qty:
# Publicar evento de fallo
event_bus.publish(Event(
event_type='InventoryReservationFailed',
aggregate_id=order_id,
data={'product': product, 'requested': qty},
timestamp=datetime.now()
))
return
self.inventory[product] -= qty
# Éxito: publicar evento
event_bus.publish(Event(
event_type='InventoryReserved',
aggregate_id=order_id,
data={'items': items},
timestamp=datetime.now()
))
# Servicio 3: Payment Service
class PaymentService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe('InventoryReserved', self.handle_inventory_reserved)
def handle_inventory_reserved(self, event: Event):
"""Procesar pago cuando stock está reservado"""
print(f"\n💳 PaymentService processing {event.event_type}")
order_id = event.aggregate_id
# Simular procesamiento de pago
success = True # En realidad llamaría a Stripe, etc.
if success:
event_bus.publish(Event(
event_type='PaymentSucceeded',
aggregate_id=order_id,
data={'payment_id': 'pay_123'},
timestamp=datetime.now()
))
else:
event_bus.publish(Event(
event_type='PaymentFailed',
aggregate_id=order_id,
data={'reason': 'Insufficient funds'},
timestamp=datetime.now()
))
# Servicio 4: Notification Service
class NotificationService:
def __init__(self, event_bus: EventBus):
event_bus.subscribe('OrderCreated', self.send_order_confirmation)
event_bus.subscribe('PaymentSucceeded', self.send_payment_confirmation)
event_bus.subscribe('InventoryReservationFailed', self.send_failure_notification)
def send_order_confirmation(self, event: Event):
print(f"\n📧 Sending order confirmation for {event.aggregate_id}")
def send_payment_confirmation(self, event: Event):
print(f"\n📧 Sending payment confirmation for {event.aggregate_id}")
def send_failure_notification(self, event: Event):
print(f"\n📧 Sending failure notification for {event.aggregate_id}")
# ===== Demostración =====
print("=== Event-Driven Architecture Demo ===\n")
# Inicializar servicios
order_service = OrderService(event_bus)
inventory_service = InventoryService(event_bus)
payment_service = PaymentService(event_bus)
notification_service = NotificationService(event_bus)
# Crear orden (esto desencadena toda la cadena)
import time
order = order_service.create_order(
order_id='order_001',
user_id='user_123',
items=[
{'product': 'laptop', 'price': 999, 'qty': 1},
{'product': 'mouse', 'price': 29, 'qty': 2}
]
)
# Esperar a que se procesen los eventos (async)
time.sleep(0.5)
print("\n=== Event Log ===")
for event in event_bus.event_log:
print(f"- {event.event_type} @ {event.timestamp.strftime('%H:%M:%S')}")
# Reconstruir estado desde eventos (Event Sourcing)
print("\n=== Replay Events for order_001 ===")
events = event_bus.replay_events('order_001')
for event in events:
print(f"- {event.event_type}: {event.data}")
Patrones de mensajería:
# 1. PUBLISH-SUBSCRIBE (1:N)
# Un evento, múltiples consumidores
event_bus.subscribe('UserRegistered', send_welcome_email)
event_bus.subscribe('UserRegistered', create_default_preferences)
event_bus.subscribe('UserRegistered', send_to_analytics)
# 2. POINT-TO-POINT (1:1)
# Task queue: Un mensaje, un consumidor
task_queue = Queue()
task_queue.put({'task': 'send_email', 'to': 'user@example.com'})
# Workers compiten por tareas
def worker():
while True:
task = task_queue.get()
process_task(task)
task_queue.task_done()
# 3. REQUEST-REPLY (síncrono simulado)
# Cliente espera respuesta
def request_with_correlation():
correlation_id = 'req_123'
# Publicar request
event_bus.publish(Event(
event_type='GetUserRequest',
aggregate_id=correlation_id,
data={'user_id': '456'},
timestamp=datetime.now()
))
# Esperar respuesta (normalmente con timeout)
# En producción: usar reply queue
Tecnologías para Event-Driven:
- Message Brokers: RabbitMQ, Apache Kafka, AWS SQS/SNS, Google Pub/Sub
- Event Stores: EventStoreDB, Apache Kafka (también puede ser store)
- Streaming: Apache Kafka, AWS Kinesis, Apache Flink
Receta 2.3: CQRS y Event Sourcing
¿Qué es CQRS? Command Query Responsibility Segregation: Separar modelos de escritura (Commands) y lectura (Queries).
¿Qué es Event Sourcing? Almacenar cambios de estado como secuencia de eventos en lugar del estado actual.
Caso de uso: Sistema bancario
from typing import List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
# ===== EVENT SOURCING =====
class EventType(Enum):
ACCOUNT_CREATED = "account_created"
MONEY_DEPOSITED = "money_deposited"
MONEY_WITHDRAWN = "money_withdrawn"
ACCOUNT_FROZEN = "account_frozen"
@dataclass
class DomainEvent:
"""Evento de dominio inmutable"""
event_id: str
event_type: EventType
aggregate_id: str # Account ID
data: dict
timestamp: datetime = field(default_factory=datetime.now)
version: int = 1
class EventStore:
"""Store de eventos (en producción: EventStoreDB, PostgreSQL)"""
def __init__(self):
self.events = [] # En producción: DB append-only
def append(self, event: DomainEvent):
"""Agregar evento (inmutable, solo append)"""
self.events.append(event)
print(f"✓ Event stored: {event.event_type.value} v{event.version}")
def get_events(self, aggregate_id: str) -> List[DomainEvent]:
"""Obtener todos los eventos de un agregado"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
def get_events_from_version(self, aggregate_id: str, from_version: int):
"""Obtener eventos desde cierta versión (para snapshots)"""
return [
e for e in self.events
if e.aggregate_id == aggregate_id and e.version >= from_version
]
# ===== WRITE MODEL (Commands) =====
class AccountAggregate:
"""Agregado que reconstruye estado desde eventos"""
def __init__(self, account_id: str):
self.account_id = account_id
self.balance = 0
self.status = 'inactive'
self.version = 0
self.uncommitted_events = []
def create_account(self, owner: str, initial_deposit: float):
"""Command: Crear cuenta"""
if self.status != 'inactive':
raise ValueError("Account already exists")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.ACCOUNT_CREATED,
aggregate_id=self.account_id,
data={'owner': owner, 'initial_deposit': initial_deposit},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def deposit(self, amount: float):
"""Command: Depositar"""
if amount <= 0:
raise ValueError("Amount must be positive")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.MONEY_DEPOSITED,
aggregate_id=self.account_id,
data={'amount': amount},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def withdraw(self, amount: float):
"""Command: Retirar"""
if amount <= 0:
raise ValueError("Amount must be positive")
if self.balance < amount:
raise ValueError("Insufficient funds")
if self.status == 'frozen':
raise ValueError("Account is frozen")
event = DomainEvent(
event_id=f"evt_{self.version + 1}",
event_type=EventType.MONEY_WITHDRAWN,
aggregate_id=self.account_id,
data={'amount': amount},
version=self.version + 1
)
self._apply_event(event)
self.uncommitted_events.append(event)
def _apply_event(self, event: DomainEvent):
"""Aplicar evento al estado (Event Sourcing)"""
if event.event_type == EventType.ACCOUNT_CREATED:
self.status = 'active'
self.balance = event.data['initial_deposit']
elif event.event_type == EventType.MONEY_DEPOSITED:
self.balance += event.data['amount']
elif event.event_type == EventType.MONEY_WITHDRAWN:
self.balance -= event.data['amount']
elif event.event_type == EventType.ACCOUNT_FROZEN:
self.status = 'frozen'
self.version = event.version
def load_from_history(self, events: List[DomainEvent]):
"""Reconstruir estado desde eventos (Event Sourcing)"""
for event in events:
self._apply_event(event)
def get_uncommitted_events(self):
"""Obtener eventos pendientes de guardar"""
events = self.uncommitted_events
self.uncommitted_events = []
return events
# ===== READ MODEL (Queries) =====
class AccountReadModel:
"""Modelo optimizado para lecturas (CQRS)"""
def __init__(self):
# Diferentes proyecciones para diferentes queries
self.accounts_by_id = {}
self.accounts_by_balance = [] # Ordenado por balance
self.transaction_history = {} # {account_id: [transactions]}
def project_event(self, event: DomainEvent):
"""Actualizar proyección cuando llega evento"""
account_id = event.aggregate_id
if event.event_type == EventType.ACCOUNT_CREATED:
self.accounts_by_id[account_id] = {
'id': account_id,
'owner': event.data['owner'],
'balance': event.data['initial_deposit'],
'status': 'active',
'created_at': event.timestamp
}
self.transaction_history[account_id] = []
elif event.event_type == EventType.MONEY_DEPOSITED:
self.accounts_by_id[account_id]['balance'] += event.data['amount']
self.transaction_history[account_id].append({
'type': 'deposit',
'amount': event.data['amount'],
'timestamp': event.timestamp,
'balance_after': self.accounts_by_id[account_id]['balance']
})
elif event.event_type == EventType.MONEY_WITHDRAWN:
self.accounts_by_id[account_id]['balance'] -= event.data['amount']
self.transaction_history[account_id].append({
'type': 'withdrawal',
'amount': event.data['amount'],
'timestamp': event.timestamp,
'balance_after': self.accounts_by_id[account_id]['balance']
})
# Actualizar índice ordenado por balance
self._rebuild_balance_index()
def _rebuild_balance_index(self):
"""Reconstruir índice ordenado"""
self.accounts_by_balance = sorted(
self.accounts_by_id.values(),
key=lambda x: x['balance'],
reverse=True
)
def get_account(self, account_id: str):
"""Query: Obtener cuenta por ID - O(1)"""
return self.accounts_by_id.get(account_id)
def get_richest_accounts(self, limit: int = 10):
"""Query: Top cuentas por balance - O(1) por índice pre-calculado"""
return self.accounts_by_balance[:limit]
def get_transaction_history(self, account_id: str):
"""Query: Historial de transacciones - O(1)"""
return self.transaction_history.get(account_id, [])
# ===== COMMAND HANDLER =====
class BankingCommandHandler:
"""Maneja commands y persiste eventos"""
def __init__(self, event_store: EventStore, read_model: AccountReadModel):
self.event_store = event_store
self.read_model = read_model
def handle_create_account(self, account_id: str, owner: str, initial_deposit: float):
"""Command Handler: Crear cuenta"""
# 1. Cargar agregado desde eventos
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
# 2. Ejecutar command
account.create_account(owner, initial_deposit)
# 3. Persistir eventos
for event in account.get_uncommitted_events():
self.event_store.append(event)
# 4. Actualizar read model (eventual consistency)
self.read_model.project_event(event)
def handle_deposit(self, account_id: str, amount: float):
"""Command Handler: Depositar"""
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
account.deposit(amount)
for event in account.get_uncommitted_events():
self.event_store.append(event)
self.read_model.project_event(event)
def handle_withdraw(self, account_id: str, amount: float):
"""Command Handler: Retirar"""
account = AccountAggregate(account_id)
events = self.event_store.get_events(account_id)
account.load_from_history(events)
account.withdraw(amount)
for event in account.get_uncommitted_events():
self.event_store.append(event)
self.read_model.project_event(event)
# ===== QUERY HANDLER =====
class BankingQueryHandler:
"""Maneja queries (solo lectura)"""
def __init__(self, read_model: AccountReadModel):
self.read_model = read_model
def get_account_balance(self, account_id: str):
"""Query: Balance actual"""
account = self.read_model.get_account(account_id)
return account['balance'] if account else None
def get_account_details(self, account_id: str):
"""Query: Detalles completos"""
return self.read_model.get_account(account_id)
def get_top_balances(self):
"""Query: Top 10 cuentas"""
return self.read_model.get_richest_accounts(10)
def get_transactions(self, account_id: str):
"""Query: Historial"""
return self.read_model.get_transaction_history(account_id)
# ===== DEMOSTRACIÓN =====
print("=== CQRS + Event Sourcing Demo ===\n")
event_store = EventStore()
read_model = AccountReadModel()
command_handler = BankingCommandHandler(event_store, read_model)
query_handler = BankingQueryHandler(read_model)
# COMMANDS (Write)
print("--- COMMANDS ---")
command_handler.handle_create_account('ACC001', 'Alice', 1000.0)
command_handler.handle_deposit('ACC001', 500.0)
command_handler.handle_withdraw('ACC001', 200.0)
command_handler.handle_deposit('ACC001', 1000.0)
command_handler.handle_create_account('ACC002', 'Bob', 5000.0)
command_handler.handle_deposit('ACC002', 2000.0)
# QUERIES (Read)
print("\n--- QUERIES ---")
balance = query_handler.get_account_balance('ACC001')
print(f"Alice's balance: ${balance}")
details = query_handler.get_account_details('ACC001')
print(f"Alice's account: {details}")
top_accounts = query_handler.get_top_balances()
print(f"\nTop accounts:")
for acc in top_accounts:
print(f" {acc['owner']}: ${acc['balance']}")
transactions = query_handler.get_transactions('ACC001')
print(f"\nAlice's transactions:")
for tx in transactions:
print(f" {tx['type']}: ${tx['amount']} -> Balance: ${tx['balance_after']}")
# EVENT SOURCING: Reconstruir estado desde eventos
print("\n--- EVENT SOURCING: Rebuild from history ---")
new_account = AccountAggregate('ACC001')
events = event_store.get_events('ACC001')
new_account.load_from_history(events)
print(f"Rebuilt balance: ${new_account.balance} (version {new_account.version})")
print(f"\nTotal events stored: {len(event_store.events)}")
Ventajas de CQRS + Event Sourcing:
- ✅ Auditoría completa (todos los cambios registrados)
- ✅ Time-travel (reconstruir estado en cualquier punto)
- ✅ Read models optimizados para cada query
- ✅ Escalabilidad (reads y writes separados)
- ✅ Replay de eventos (reprocessing, analytics)
Desventajas:
- ❌ Complejidad alta
- ❌ Eventual consistency
- ❌ Storage crece constantemente (mitigation: snapshots)
- ❌ Difícil de debuggear
Receta 2.4: CAP Theorem y Consistencia Eventual
CAP Theorem: En un sistema distribuido, solo puedes garantizar 2 de 3:
- Consistency: Todos ven los mismos datos al mismo tiempo
- Availability: Sistema siempre responde (aunque sea con datos viejos)
- Partition tolerance: Sistema funciona aunque haya fallos de red
Trade-offs:
| Sistema | Prioriza | Ejemplo |
|---|---|---|
| CP | Consistencia + Particiones | Bancos tradicionales, Zookeeper |
| AP | Disponibilidad + Particiones | DNS, Cassandra, DynamoDB |
| CA | Consistencia + Disponibilidad | RDBMS single-node (no distribuido) |
Implementación de consistencia eventual:
import time
import threading
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class WriteOperation:
"""Operación de escritura con timestamp"""
key: str
value: any
timestamp: float
node_id: str
class EventuallyConsistentStore:
"""Distributed key-value store con consistencia eventual (AP)"""
def __init__(self, node_id: str, replication_factor: int = 3):
self.node_id = node_id
self.replication_factor = replication_factor
self.data: Dict[str, any] = {}
self.vector_clock: Dict[str, int] = {} # Lamport clock
self.write_log: List[WriteOperation] = []
self.peer_nodes = []
self.lock = threading.Lock()
def connect_to_peer(self, peer_node):
"""Conectar a otro nodo (cluster)"""
self.peer_nodes.append(peer_node)
print(f"✓ {self.node_id} connected to {peer_node.node_id}")
def write(self, key: str, value: any):
"""Escribir con consistencia eventual"""
with self.lock:
# 1. Escribir localmente (disponibilidad)
self.data[key] = value
self.vector_clock[key] = self.vector_clock.get(key, 0) + 1
operation = WriteOperation(
key=key,
value=value,
timestamp=time.time(),
node_id=self.node_id
)
self.write_log.append(operation)
print(f"✓ {self.node_id}: Wrote {key}={value} locally")
# 2. Replicar asíncronamente (eventual consistency)
self._replicate_async(operation)
return True
def _replicate_async(self, operation: WriteOperation):
"""Replicar a otros nodos asíncronamente"""
def replicate():
time.sleep(0.1) # Simular network delay
for peer in self.peer_nodes[:self.replication_factor - 1]:
try:
peer._receive_replication(operation)
except Exception as e:
print(f"⚠️ Replication failed to {peer.node_id}: {e}")
threading.Thread(target=replicate, daemon=True).start()
def _receive_replication(self, operation: WriteOperation):
"""Recibir replicación de otro nodo"""
with self.lock:
current_version = self.vector_clock.get(operation.key, 0)
# Resolver conflictos: Last-Write-Wins usando timestamp
if operation.key not in self.data or operation.timestamp > self.write_log[-1].timestamp:
self.data[operation.key] = operation.value
self.vector_clock[operation.key] = current_version + 1
print(f"✓ {self.node_id}: Replicated {operation.key}={operation.value} from {operation.node_id}")
def read(self, key: str, consistency_level='ONE'):
"""
Leer con diferentes niveles de consistencia
ONE: Leer de un nodo (más rápido, puede ser stale)
QUORUM: Leer de mayoría (balance)
ALL: Leer de todos (más consistente, más lento)
"""
if consistency_level == 'ONE':
# Lectura local (rápida pero eventual)
value = self.data.get(key)
print(f"🔍 {self.node_id}: Read {key}={value} (ONE)")
return value
elif consistency_level == 'QUORUM':
# Leer de mayoría de nodos
quorum_size = (len(self.peer_nodes) + 1) // 2 + 1
responses = [self.data.get(key)]
for peer in self.peer_nodes[:quorum_size - 1]:
responses.append(peer.data.get(key))
# Resolver: tomar valor más común
value = max(set(responses), key=responses.count)
print(f"🔍 {self.node_id}: Read {key}={value} (QUORUM)")
return value
elif consistency_level == 'ALL':
# Leer de todos (strong consistency)
responses = [self.data.get(key)]
for peer in self.peer_nodes:
responses.append(peer.data.get(key))
# Verificar que todos tienen el mismo valor
if len(set(responses)) > 1:
print(f"⚠️ Inconsistency detected for {key}: {set(responses)}")
value = responses[0]
print(f"🔍 {self.node_id}: Read {key}={value} (ALL)")
return value
def get_stats(self):
"""Estadísticas del nodo"""
return {
'node_id': self.node_id,
'keys': len(self.data),
'writes': len(self.write_log),
'data': dict(self.data)
}
# ===== DEMOSTRACIÓN =====
print("=== Eventual Consistency Demo ===\n")
# Crear cluster de 3 nodos
node1 = EventuallyConsistentStore('Node1')
node2 = EventuallyConsistentStore('Node2')
node3 = EventuallyConsistentStore('Node3')
# Conectar nodos en cluster
node1.connect_to_peer(node2)
node1.connect_to_peer(node3)
node2.connect_to_peer(node1)
node2.connect_to_peer(node3)
node3.connect_to_peer(node1)
node3.connect_to_peer(node2)
print("\n--- Scenario 1: Normal Operation ---")
node1.write('user:123', {'name': 'Alice', 'age': 30})
time.sleep(0.3) # Esperar replicación
# Leer desde diferentes nodos
node1.read('user:123', 'ONE')
node2.read('user:123', 'ONE') # Eventualmente consistente
node3.read('user:123', 'ONE')
print("\n--- Scenario 2: Read Before Replication ---")
node1.write('user:456', {'name': 'Bob', 'age': 25})
node2.read('user:456', 'ONE') # Puede no estar disponible aún
time.sleep(0.3)
node2.read('user:456', 'ONE') # Ahora sí está replicado
print("\n--- Scenario 3: Conflicting Writes ---")
# Escrituras concurrentes en diferentes nodos
node1.write('user:789', {'name': 'Charlie', 'version': 1})
node2.write('user:789', {'name': 'Charlie Updated', 'version': 2})
time.sleep(0.5) # Esperar convergencia
# Last-Write-Wins: el valor más reciente gana
print("\nAfter convergence:")
for node in [node1, node2, node3]:
value = node.read('user:789', 'ONE')
print("\n--- Node Stats ---")
for node in [node1, node2, node3]:
print(f"{node.node_id}: {node.get_stats()}")
Estrategias de resolución de conflictos:
class ConflictResolution:
"""Diferentes estrategias para resolver conflictos"""
@staticmethod
def last_write_wins(values_with_timestamps):
"""LWW: El valor con timestamp más reciente gana"""
return max(values_with_timestamps, key=lambda x: x['timestamp'])
@staticmethod
def first_write_wins(values_with_timestamps):
"""FWW: El primer valor gana"""
return min(values_with_timestamps, key=lambda x: x['timestamp'])
@staticmethod
def merge_sets(values):
"""Para sets: union de todos los valores"""
result = set()
for value in values:
result.update(value)
return result
@staticmethod
def custom_merge(values, merge_function):
"""Merge custom definido por la aplicación"""
return merge_function(values)
@staticmethod
def vector_clock_resolution(values_with_clocks):
"""Usar vector clocks para detectar causalidad"""
# Implementación simplificada
for v1 in values_with_clocks:
is_dominated = False
for v2 in values_with_clocks:
if v1 != v2 and all(v1['clock'][k] <= v2['clock'].get(k, 0) for k in v1['clock']):
is_dominated = True
break
if not is_dominated:
return v1 # Este valor no es dominado por ningún otro
return values_with_clocks[0] # Conflicto real, usar estrategia de fallback
# Ejemplo: Shopping cart con merge
class ShoppingCart:
"""Carrito de compras con consistencia eventual"""
@staticmethod
def merge_carts(cart1: set, cart2: set):
"""Merge de carritos: union de items"""
# ADD WINS semántica: nunca perdemos items
return cart1.union(cart2)
cart_node1 = {'item1', 'item2'}
cart_node2 = {'item2', 'item3'}
merged = ShoppingCart.merge_carts(cart_node1, cart_node2)
print(f"Merged cart: {merged}") # {'item1', 'item2', 'item3'}
Cuándo usar consistencia eventual:
- ✅ Redes sociales (likes, follows pueden estar desfasados)
- ✅ Carritos de compra (mejor agregar de más que perder items)
- ✅ Feeds de noticias
- ✅ Sistemas de recomendación
- ✅ CDNs (contenido estático)
Cuándo NO usar:
- ❌ Transacciones financieras
- ❌ Reservas de inventario
- ❌ Sistemas de votación
- ❌ Gestión de permisos críticos
Receta 2.5: Load Balancing y Caching
Load Balancing: Distribuir tráfico entre servidores
from typing import List
from dataclasses import dataclass
import hashlib
import random
@dataclass
class Server:
"""Servidor backend"""
id: str
host: str
port: int
weight: int = 1 # Para weighted round-robin
health: str = 'healthy' # healthy, unhealthy
active_connections: int = 0
class LoadBalancer:
"""Load balancer con múltiples algoritmos"""
def __init__(self, servers: List[Server]):
self.servers = servers
self.round_robin_index = 0
self.request_count = 0
def get_server_round_robin(self) -> Server:
"""Round Robin: Rotar entre servidores equitativamente"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
server = healthy_servers[self.round_robin_index % len(healthy_servers)]
self.round_robin_index += 1
return server
def get_server_weighted_round_robin(self) -> Server:
"""Weighted Round Robin: Considerar capacidad del servidor"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
# Crear lista con repeticiones basadas en peso
weighted_servers = []
for server in healthy_servers:
weighted_servers.extend([server] * server.weight)
server = weighted_servers[self.round_robin_index % len(weighted_servers)]
self.round_robin_index += 1
return server
def get_server_least_connections(self) -> Server:
"""Least Connections: Servidor con menos conexiones activas"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
return min(healthy_servers, key=lambda s: s.active_connections)
def get_server_ip_hash(self, client_ip: str) -> Server:
"""IP Hash: Mismo cliente siempre al mismo servidor (sticky sessions)"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
# Hash consistente del IP
hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
index = hash_value % len(healthy_servers)
return healthy_servers[index]
def get_server_random(self) -> Server:
"""Random: Servidor aleatorio"""
healthy_servers = [s for s in self.servers if s.health == 'healthy']
if not healthy_servers:
raise Exception("No healthy servers")
return random.choice(healthy_servers)
def health_check(self):
"""Health check de servidores"""
for server in self.servers:
# En producción: HTTP health check
# Simulación: random failure
if random.random() < 0.1: # 10% chance de fallo
server.health = 'unhealthy'
print(f"⚠️ {server.id} is unhealthy")
else:
server.health = 'healthy'
# ===== CACHING STRATEGIES =====
import time
from collections import OrderedDict
class LRUCache:
"""Least Recently Used Cache"""
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
self.hits = 0
self.misses = 0
def get(self, key: str):
"""Obtener del cache"""
if key in self.cache:
# Mover al final (más recientemente usado)
self.cache.move_to_end(key)
self.hits += 1
return self.cache[key]
self.misses += 1
return None
def put(self, key: str, value):
"""Agregar al cache"""
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.capacity:
# Eliminar el menos recientemente usado (primero)
self.cache.popitem(last=False)
self.cache[key] = value
def get_stats(self):
"""Estadísticas de cache"""
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': f'{hit_rate:.1f}%',
'size': len(self.cache)
}
class CacheAside:
"""Cache-Aside pattern (Lazy Loading)"""
def __init__(self, cache: LRUCache, database):
self.cache = cache
self.database = database
def get(self, key: str):
"""
1. Buscar en cache
2. Si no está, buscar en DB
3. Guardar en cache
"""
# 1. Buscar en cache
value = self.cache.get(key)
if value is not None:
print(f"✓ Cache HIT for {key}")
return value
# 2. Cache MISS: buscar en DB
print(f"✗ Cache MISS for {key}, fetching from DB...")
time.sleep(0.1) # Simular latencia de DB
value = self.database.get(key)
# 3. Guardar en cache
if value is not None:
self.cache.put(key, value)
return value
def update(self, key: str, value):
"""
Write-Through: Actualizar DB y cache
"""
# 1. Actualizar DB
self.database[key] = value
# 2. Actualizar cache
self.cache.put(key, value)
class WriteBackCache:
"""Write-Back pattern: escribir a DB asíncronamente"""
def __init__(self, cache: LRUCache, database, flush_interval=5):
self.cache = cache
self.database = database
self.dirty_keys = set()
self.flush_interval = flush_interval
# Background flusher
import threading
self.flusher_thread = threading.Thread(target=self._flush_periodically, daemon=True)
self.flusher_thread.start()
def write(self, key: str, value):
"""Escribir en cache (lazy DB write)"""
self.cache.put(key, value)
self.dirty_keys.add(key)
print(f"✓ Wrote {key} to cache (DB write pending)")
def _flush_periodically(self):
"""Flush cache a DB periódicamente"""
while True:
time.sleep(self.flush_interval)
self._flush()
def _flush(self):
"""Flush dirty keys a DB"""
if self.dirty_keys:
print(f"\n🔄 Flushing {len(self.dirty_keys)} keys to DB...")
for key in list(self.dirty_keys):
value = self.cache.get(key)
if value is not None:
self.database[key] = value
self.dirty_keys.clear()
print("✓ Flush complete")
# ===== CACHE INVALIDATION =====
class CacheInvalidation:
"""Estrategias de invalidación de cache"""
@staticmethod
def ttl_based(cache_entry, ttl_seconds=300):
"""Time-To-Live: invalidar después de N segundos"""
age = time.time() - cache_entry['timestamp']
return age > ttl_seconds
@staticmethod
def event_based(cache, event_type, affected_keys):
"""Event-based: invalidar cuando cambian datos"""
for key in affected_keys:
cache.cache.pop(key, None)
print(f"✓ Invalidated {len(affected_keys)} keys due to {event_type}")
@staticmethod
def tag_based(cache, tags):
"""Tag-based: invalidar por categorías"""
# Cache entries con tags: {'key': 'user:123', 'tags': ['users', 'profile']}
# Invalidar todos los entries con tag 'users'
pass
# ===== DEMOSTRACIÓN =====
print("=== Load Balancing Demo ===\n")
servers = [
Server('server1', '10.0.0.1', 8080, weight=3),
Server('server2', '10.0.0.2', 8080, weight=2),
Server('server3', '10.0.0.3', 8080, weight=1),
]
lb = LoadBalancer(servers)
# Round Robin
print("--- Round Robin ---")
for i in range(6):
server = lb.get_server_round_robin()
print(f"Request {i+1} → {server.id}")
# Weighted Round Robin
print("\n--- Weighted Round Robin ---")
lb.round_robin_index = 0
for i in range(6):
server = lb.get_server_weighted_round_robin()
print(f"Request {i+1} → {server.id}")
# IP Hash (sticky sessions)
print("\n--- IP Hash (Sticky Sessions) ---")
clients = ['192.168.1.1', '192.168.1.2', '192.168.1.1', '192.168.1.3', '192.168.1.1']
for i, client_ip in enumerate(clients):
server = lb.get_server_ip_hash(client_ip)
print(f"Request {i+1} from {client_ip} → {server.id}")
print("\n=== Caching Demo ===\n")
# Setup
database = {
'user:1': {'name': 'Alice', 'age': 30},
'user:2': {'name': 'Bob', 'age': 25},
'user:3': {'name': 'Charlie', 'age': 35}
}
cache = LRUCache(capacity=2)
cache_aside = CacheAside(cache, database)
# Cache-Aside pattern
print("--- Cache-Aside Pattern ---")
cache_aside.get('user:1') # MISS
cache_aside.get('user:1') # HIT
cache_aside.get('user:2') # MISS
cache_aside.get('user:3') # MISS (evicts user:1)
cache_aside.get('user:1') # MISS again
print(f"\nCache stats: {cache.get_stats()}")
# Write-Back pattern
print("\n--- Write-Back Pattern ---")
wb_cache = WriteBackCache(LRUCache(10), {})
wb_cache.write('config:theme', 'dark')
wb_cache.write('config:language', 'es')
print("Waiting for flush...")
time.sleep(6)
print(f"DB contents: {wb_cache.database}")
Estrategias de caching por capa:
| Capa | Ejemplo | TTL típico |
|---|---|---|
| CDN | Cloudflare, Akamai | Horas/Días |
| Application | Redis, Memcached | Minutos/Horas |
| Database | Query cache | Segundos/Minutos |
| Browser | Local storage | Días/Semanas |
Bases de Datos
Receta 2.6: SQL Avanzado - Optimización
Índices: El arma secreta de performance
-- ===== ÍNDICES =====
-- Tabla de ejemplo
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE,
name VARCHAR(100),
age INTEGER,
city VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
);
-- ❌ SIN ÍNDICE: Full table scan - O(n)
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Seq Scan on users (cost=0.00..1234.00 rows=1)
-- ✅ CON ÍNDICE: Index scan - O(log n)
CREATE INDEX idx_users_email ON users(email);
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Index Scan using idx_users_email (cost=0.42..8.44 rows=1)
-- ÍNDICES COMPUESTOS (múltiples columnas)
CREATE INDEX idx_users_city_age ON users(city, age);
-- ✅ BIEN: Usa el índice
SELECT * FROM users WHERE city = 'NYC' AND age > 25;
-- ⚠️ PARCIAL: Solo usa parte del índice
SELECT * FROM users WHERE age > 25; -- No usa índice (age no es primera columna)
-- ❌ MAL: No usa índice
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';
-- ✅ SOLUCIÓN: Índice funcional
CREATE INDEX idx_users_email_lower ON users(LOWER(email));
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';
-- ÍNDICE PARCIAL (solo subset de datos)
CREATE INDEX idx_active_users ON users(city) WHERE created_at > '2024-01-01';
-- Útil para queries que filtran datos recientes
SELECT * FROM users WHERE city = 'LA' AND created_at > '2024-01-01';
-- ÍNDICE COVERING (incluye columnas extra)
CREATE INDEX idx_users_city_covering ON users(city) INCLUDE (name, age);
-- Query satisfecha completamente con el índice (no lee la tabla)
SELECT name, age FROM users WHERE city = 'Boston';
Window Functions: Análisis poderoso
-- ===== WINDOW FUNCTIONS =====
CREATE TABLE sales (
id SERIAL PRIMARY KEY,
salesperson VARCHAR(100),
region VARCHAR(50),
amount DECIMAL(10, 2),
sale_date DATE
);
-- ROW_NUMBER: Numerar filas
SELECT
salesperson,
amount,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank
FROM sales;
-- RANK con empates
SELECT
salesperson,
amount,
RANK() OVER (ORDER BY amount DESC) as rank,
DENSE_RANK() OVER (ORDER BY amount DESC) as dense_rank
FROM sales;
-- PARTITION BY: Agrupar ventanas
SELECT
salesperson,
region,
amount,
RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region
FROM sales;
-- Running totals (suma acumulativa)
SELECT
sale_date,
amount,
SUM(amount) OVER (ORDER BY sale_date) as running_total
FROM sales
ORDER BY sale_date;
-- Moving average (promedio móvil de 7 días)
SELECT
sale_date,
amount,
AVG(amount) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7days
FROM sales;
-- LAG y LEAD: Acceder a filas anteriores/siguientes
SELECT
sale_date,
amount,
LAG(amount, 1) OVER (ORDER BY sale_date) as prev_day,
LEAD(amount, 1) OVER (ORDER BY sale_date) as next_day,
amount - LAG(amount, 1) OVER (ORDER BY sale_date) as day_over_day_change
FROM sales;
-- NTILE: Dividir en cuartiles/percentiles
SELECT
salesperson,
amount,
NTILE(4) OVER (ORDER BY amount) as quartile
FROM sales;
Common Table Expressions (CTEs): Queries legibles
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Setup
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
session = Session()
def demonstrate_ctes():
"""CTEs para queries complejas y legibles"""
# ===== CTE SIMPLE =====
query = text("""
WITH high_value_customers AS (
SELECT
customer_id,
SUM(order_total) as lifetime_value
FROM orders
GROUP BY customer_id
HAVING SUM(order_total) > 10000
)
SELECT
c.name,
c.email,
hvc.lifetime_value
FROM customers c
JOIN high_value_customers hvc ON c.id = hvc.customer_id
ORDER BY hvc.lifetime_value DESC;
""")
# ===== CTE RECURSIVO: Jerarquías =====
org_chart_query = text("""
-- Tabla de empleados con manager_id
WITH RECURSIVE employee_hierarchy AS (
-- Caso base: CEO (sin manager)
SELECT
id,
name,
manager_id,
1 as level,
name as path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Caso recursivo: empleados con managers
SELECT
e.id,
e.name,
e.manager_id,
eh.level + 1,
eh.path || ' > ' || e.name
FROM employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
level,
name,
path
FROM employee_hierarchy
ORDER BY level, name;
""")
# ===== MÚLTIPLES CTEs =====
complex_query = text("""
WITH
-- CTE 1: Ventas mensuales
monthly_sales AS (
SELECT
DATE_TRUNC('month', sale_date) as month,
SUM(amount) as total_sales
FROM sales
GROUP BY DATE_TRUNC('month', sale_date)
),
-- CTE 2: Promedio móvil
sales_with_avg AS (
SELECT
month,
total_sales,
AVG(total_sales) OVER (
ORDER BY month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3months
FROM monthly_sales
),
-- CTE 3: Growth rate
sales_with_growth AS (
SELECT
month,
total_sales,
moving_avg_3months,
(total_sales - LAG(total_sales) OVER (ORDER BY month)) /
LAG(total_sales) OVER (ORDER BY month) * 100 as growth_rate
FROM sales_with_avg
)
SELECT * FROM sales_with_growth
WHERE growth_rate IS NOT NULL
ORDER BY month DESC;
""")
results = session.execute(complex_query)
return results.fetchall()
# ===== OPTIMIZACIÓN DE QUERIES =====
def query_optimization_tips():
"""Tips para optimizar queries"""
# ❌ MAL: N+1 queries
users = session.execute(text("SELECT * FROM users")).fetchall()
for user in users:
# Para cada usuario, query separado (N queries)
orders = session.execute(
text("SELECT * FROM orders WHERE user_id = :user_id"),
{"user_id": user.id}
).fetchall()
# ✅ BIEN: JOIN (1 query)
results = session.execute(text("""
SELECT
u.*,
o.*
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
""")).fetchall()
# ❌ MAL: SELECT *
session.execute(text("SELECT * FROM users")) # Trae todas las columnas
# ✅ BIEN: SELECT específico
session.execute(text("SELECT id, email, name FROM users"))
# ❌ MAL: OR en múltiples columnas (no usa índices)
session.execute(text("""
SELECT * FROM products
WHERE category = 'electronics' OR brand = 'Apple'
"""))
# ✅ BIEN: UNION de queries indexadas
session.execute(text("""
SELECT * FROM products WHERE category = 'electronics'
UNION
SELECT * FROM products WHERE brand = 'Apple'
"""))
# ✅ MEJOR: Si es AND, usa índice compuesto
# CREATE INDEX idx_products_category_brand ON products(category, brand)
session.execute(text("""
SELECT * FROM products
WHERE category = 'electronics' AND brand = 'Apple'
"""))
Transacciones y niveles de aislamiento:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
@contextmanager
def transaction_scope(isolation_level='READ COMMITTED'):
"""Context manager para transacciones"""
session = Session()
session.connection(execution_options={"isolation_level": isolation_level})
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()
# ===== ACID PROPERTIES =====
def demonstrate_transactions():
"""Demostrar propiedades ACID"""
# ATOMICITY: Todo o nada
with transaction_scope() as session:
session.execute(text("""
UPDATE accounts SET balance = balance - 100 WHERE id = 1
"""))
session.execute(text("""
UPDATE accounts SET balance = balance + 100 WHERE id = 2
"""))
# Ambas operaciones o ninguna (commit/rollback)
# ISOLATION LEVELS
# 1. READ UNCOMMITTED (dirty reads posibles)
with transaction_scope('READ UNCOMMITTED') as session:
# Puede ver cambios no commiteados de otras transacciones
pass
# 2. READ COMMITTED (default en PostgreSQL)
with transaction_scope('READ COMMITTED') as session:
# Solo ve cambios commiteados
# Pero puede ver diferentes datos en múltiples reads
pass
# 3. REPEATABLE READ
with transaction_scope('REPEATABLE READ') as session:
# Mismos datos en múltiples reads dentro de la transacción
# Phantom reads posibles (nuevas filas)
pass
# 4. SERIALIZABLE (más estricto)
with transaction_scope('SERIALIZABLE') as session:
# Transacciones ejecutan como si fueran secuenciales
# Sin phantom reads, pero puede fallar con serialization errors
pass
# ===== DEADLOCKS Y OPTIMISTIC LOCKING =====
def handle_deadlocks():
"""Manejar deadlocks con retry"""
from sqlalchemy.exc import OperationalError
import time
max_retries = 3
for attempt in range(max_retries):
try:
with transaction_scope() as session:
# Operaciones que pueden causar deadlock
session.execute(text("""
UPDATE inventory
SET quantity = quantity - 1
WHERE product_id = :id
"""), {"id": 123})
session.execute(text("""
INSERT INTO order_items (product_id, quantity)
VALUES (:id, 1)
"""), {"id": 123})
break # Éxito
except OperationalError as e:
if 'deadlock detected' in str(e).lower():
if attempt < max_retries - 1:
print(f"Deadlock detected, retrying... (attempt {attempt + 1})")
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
else:
raise
# Optimistic locking con version number
class ProductWithVersion:
"""
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
version INTEGER DEFAULT 1
)
"""
@staticmethod
def update_with_optimistic_lock(product_id, new_price, expected_version):
with transaction_scope() as session:
result = session.execute(text("""
UPDATE products
SET price = :new_price, version = version + 1
WHERE id = :id AND version = :expected_version
RETURNING version
"""), {
"new_price": new_price,
"id": product_id,
"expected_version": expected_version
})
if result.rowcount == 0:
raise Exception("Optimistic lock failed: product was modified")
return result.fetchone()[0] # Nueva versión
Receta 2.7: NoSQL - Cuándo y cómo usar
Comparación SQL vs NoSQL:
| Característica | SQL | NoSQL |
|---|---|---|
| Schema | Rígido (tablas) | Flexible (documentos, key-value) |
| Escalabilidad | Vertical | Horizontal |
| Transacciones | ACID completo | Eventual consistency (mayormente) |
| Joins | ✅ Eficientes | ❌ Limitados/evitar |
| Casos de uso | Finanzas, ERP | Redes sociales, IoT, logs |
MongoDB: Document Store
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
import json
# Conectar a MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
# ===== OPERACIONES BÁSICAS =====
def mongodb_crud():
"""CRUD en MongoDB"""
products = db['products']
# CREATE
product = {
'name': 'Laptop Pro',
'price': 1299.99,
'specs': {
'cpu': 'Intel i7',
'ram': '16GB',
'storage': '512GB SSD'
},
'tags': ['electronics', 'computers'],
'in_stock': True,
'created_at': datetime.now()
}
result = products.insert_one(product)
print(f"Inserted ID: {result.inserted_id}")
# READ
laptop = products.find_one({'name': 'Laptop Pro'})
print(f"Found: {laptop}")
# UPDATE
products.update_one(
{'name': 'Laptop Pro'},
{'$set': {'price': 1199.99, 'in_stock': False}}
)
# DELETE
products.delete_one({'name': 'Laptop Pro'})
# BULK INSERT
products.insert_many([
{'name': 'Mouse', 'price': 29.99, 'category': 'accessories'},
{'name': 'Keyboard', 'price': 79.99, 'category': 'accessories'},
])
# ===== QUERIES AVANZADOS =====
def mongodb_queries():
"""Queries complejos en MongoDB"""
products = db['products']
# Queries con operadores
expensive = products.find({'price': {'$gt': 100}}) # Mayor que
electronics = products.find({'tags': {'$in': ['electronics', 'computers']}})
# Text search (requiere índice de texto)
products.create_index([('name', 'text'), ('description', 'text')])
search_results = products.find({'$text': {'$search': 'laptop pro'}})
# Aggregation pipeline
pipeline = [
# Stage 1: Filtrar
{'$match': {'in_stock': True}},
# Stage 2: Agrupar
{'$group': {
'_id': '$category',
'total_products': {'$sum': 1},
'avg_price': {'$avg': '$price'},
'max_price': {'$max': '$price'}
}},
# Stage 3: Ordenar
{'$sort': {'avg_price': -1}},
# Stage 4: Limitar
{'$limit': 10}
]
results = list(products.aggregate(pipeline))
print(f"Aggregation results: {json.dumps(results, indent=2, default=str)}")
# Lookup (join)
orders = db['orders']
pipeline = [
{'$lookup': {
'from': 'products',
'localField': 'product_id',
'foreignField': '_id',
'as': 'product_details'
}},
{'$unwind': '$product_details'}
]
enriched_orders = list(orders.aggregate(pipeline))
# ===== MODELADO DE DATOS =====
def data_modeling_patterns():
"""Patrones de modelado en MongoDB"""
# PATRÓN 1: Embedding (desnormalización)
# Usar cuando: Relación 1-a-pocos, datos se leen juntos
user_with_address = {
'_id': 'user_123',
'name': 'Alice',
'email': 'alice@example.com',
'address': { # Embedded document
'street': '123 Main St',
'city': 'NYC',
'zip': '10001'
},
'recent_orders': [ # Embedded array
{'order_id': 'ord_1', 'total': 99.99},
{'order_id': 'ord_2', 'total': 149.99}
]
}
# PATRÓN 2: Referencing (normalización)
# Usar cuando: Relación 1-a-muchos, datos se actualizan independientemente
user = {
'_id': 'user_123',
'name': 'Alice',
'email': 'alice@example.com'
}
orders = [
{'_id': 'ord_1', 'user_id': 'user_123', 'total': 99.99},
{'_id': 'ord_2', 'user_id': 'user_123', 'total': 149.99}
]
# PATRÓN 3: Hybrid (combinación)
# Denormalizar datos críticos, referenciar el resto
blog_post = {
'_id': 'post_456',
'title': 'MongoDB Best Practices',
'author': { # Denormalizar datos básicos del autor
'id': 'user_123',
'name': 'Alice',
'avatar': 'url'
},
'comments_count': 42, # Denormalizar conteo
'comment_ids': ['cmt_1', 'cmt_2'] # Referenciar comments completos
}
# PATRÓN 4: Bucketing (para time-series data)
# Agrupar múltiples eventos en un documento
sensor_readings_bucket = {
'_id': 'sensor_123_2024-01-15_10',
'sensor_id': 'sensor_123',
'date': datetime(2024, 1, 15, 10),
'readings': [
{'time': '10:00', 'temp': 22.5, 'humidity': 45},
{'time': '10:01', 'temp': 22.6, 'humidity': 46},
# ... hasta 60 readings (1 por minuto)
]
}
# ===== ÍNDICES Y PERFORMANCE =====
def mongodb_indexes():
"""Índices en MongoDB"""
products = db['products']
# Índice simple
products.create_index([('price', ASCENDING)])
# Índice compuesto
products.create_index([
('category', ASCENDING),
('price', DESCENDING)
])
# Índice único
users = db['users']
users.create_index([('email', ASCENDING)], unique=True)
# Índice TTL (auto-delete después de N segundos)
sessions = db['sessions']
sessions.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
# Índice de texto
products.create_index([('name', 'text'), ('description', 'text')])
# Ver índices
print("Indexes:", list(products.list_indexes()))
# Explain para ver uso de índices
explain = products.find({'price': {'$gt': 100}}).explain()
print(f"Query plan: {explain['executionStats']}")
Redis: In-Memory Key-Value Store
import redis
import json
from datetime import timedelta
# Conectar a Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ===== OPERACIONES BÁSICAS =====
def redis_basics():
"""Operaciones básicas en Redis"""
# STRING operations
r.set('user:1000:name', 'Alice')
r.set('user:1000:email', 'alice@example.com')
name = r.get('user:1000:name')
print(f"Name: {name}")
# SET con expiración
r.setex('session:abc123', timedelta(hours=1), 'user_data')
# Incremento atómico
r.set('page:views', 0)
r.incr('page:views')
r.incrby('page:views', 10)
views = r.get('page:views')
# HASH operations (objects)
r.hset('user:1000', mapping={
'name': 'Alice',
'email': 'alice@example.com',
'age': '30'
})
user = r.hgetall('user:1000')
print(f"User: {user}")
r.hincrby('user:1000', 'age', 1) # Increment age
# LIST operations (queues)
r.lpush('queue:emails', 'email1@example.com') # Push left
r.rpush('queue:emails', 'email2@example.com') # Push right
email = r.lpop('queue:emails') # Pop left (FIFO)
print(f"Processing: {email}")
# SET operations (unique values)
r.sadd('tags:post:123', 'python', 'redis', 'nosql')
r.sadd('tags:post:456', 'python', 'mongodb')
# Intersección (tags comunes)
common_tags = r.sinter('tags:post:123', 'tags:post:456')
print(f"Common tags: {common_tags}")
# SORTED SET (leaderboards)
r.zadd('leaderboard', {
'Alice': 1500,
'Bob': 1200,
'Charlie': 1800
})
# Top 3
top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(f"Leaderboard: {top3}")
# Rank de un usuario
rank = r.zrevrank('leaderboard', 'Alice')
print(f"Alice's rank: {rank + 1}")
# ===== CASOS DE USO =====
class RateLimiter:
"""Rate limiter con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, user_id: str, max_requests: int = 10, window_seconds: int = 60):
"""
Sliding window rate limiter
Permite max_requests en window_seconds
"""
key = f"rate_limit:{user_id}"
current_time = int(time.time())
window_start = current_time - window_seconds
# Limpiar requests antiguos
self.redis.zremrangebyscore(key, 0, window_start)
# Contar requests en ventana
request_count = self.redis.zcard(key)
if request_count < max_requests:
# Agregar request actual
self.redis.zadd(key, {str(current_time): current_time})
self.redis.expire(key, window_seconds)
return True
return False
class DistributedLock:
"""Distributed lock con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def acquire_lock(self, lock_name: str, timeout: int = 10):
"""Adquirir lock"""
lock_key = f"lock:{lock_name}"
# SET NX (solo si no existe) con expiración
acquired = self.redis.set(
lock_key,
'locked',
nx=True,
ex=timeout
)
return acquired
def release_lock(self, lock_name: str):
"""Liberar lock"""
self.redis.delete(f"lock:{lock_name}")
def with_lock(self, lock_name: str):
"""Context manager para lock"""
from contextlib import contextmanager
@contextmanager
def _lock():
acquired = self.acquire_lock(lock_name)
if not acquired:
raise Exception(f"Could not acquire lock: {lock_name}")
try:
yield
finally:
self.release_lock(lock_name)
return _lock()
class SessionStore:
"""Session storage con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def create_session(self, user_id: str, session_data: dict, ttl: int = 3600):
"""Crear sesión"""
import uuid
session_id = str(uuid.uuid4())
self.redis.setex(
f"session:{session_id}",
ttl,
json.dumps({**session_data, 'user_id': user_id})
)
return session_id
def get_session(self, session_id: str):
"""Obtener sesión"""
data = self.redis.get(f"session:{session_id}")
return json.loads(data) if data else None
def extend_session(self, session_id: str, ttl: int = 3600):
"""Extender sesión"""
self.redis.expire(f"session:{session_id}", ttl)
def delete_session(self, session_id: str):
"""Eliminar sesión"""
self.redis.delete(f"session:{session_id}")
# Usar los componentes
import time
rate_limiter = RateLimiter(r)
for i in range(15):
allowed = rate_limiter.is_allowed('user_123', max_requests=10, window_seconds=60)
print(f"Request {i+1}: {'✓ Allowed' if allowed else '✗ Rate limited'}")
distributed_lock = DistributedLock(r)
with distributed_lock.with_lock('critical_section'):
print("Doing critical work...")
time.sleep(1)
session_store = SessionStore(r)
sid = session_store.create_session('user_456', {'theme': 'dark'})
print(f"Session created: {sid}")
session = session_store.get_session(sid)
print(f"Session data: {session}")
Cuándo usar cada tipo de NoSQL:
-
Document Store (MongoDB, CouchDB)
- ✅ Esquemas flexibles que evolucionan
- ✅ Datos jerárquicos (JSON-like)
- ✅ Content management, catálogos de productos
-
Key-Value (Redis, DynamoDB)
- ✅ Caching
- ✅ Session storage
- ✅ Real-time analytics
- ✅ Rate limiting
-
Column-Family (Cassandra, HBase)
- ✅ Time-series data
- ✅ Logs a escala masiva
- ✅ IoT sensor data
-
Graph (Neo4j, ArangoDB)
- ✅ Social networks
- ✅ Recommendation engines
- ✅ Fraud detection
- ✅ Knowledge graphs
APIs y Comunicación entre Servicios
Receta 2.8: REST vs GraphQL vs gRPC
Comparación:
| Característica | REST | GraphQL | gRPC |
|---|---|---|---|
| Protocolo | HTTP/JSON | HTTP/JSON | HTTP/2+Protobuf |
| Queries | Múltiples endpoints | Single endpoint | RPC calls |
| Over/Under-fetching | ❌ Común | ✅ Resuelve | ✅ Evita |
| Performance | 🟡 Media | 🟡 Media | 🟢 Alta |
| Tipado | ❌ Débil | ✅ Strong | ✅ Strong |
| Streaming | ❌ Difícil | ⚠️ Subscriptions | ✅ Bidireccional |
| Caching | ✅ HTTP cache | ❌ Complejo | ❌ Custom |
| Learning curve | 🟢 Fácil | 🟡 Media | 🔴 Alta |
REST API completo:
from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# ===== AUTHENTICATION & AUTHORIZATION =====
def token_required(f):
"""Decorator para proteger endpoints"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token missing'}), 401
try:
# Bearer eyJhbGc...
token = token.split(' ')[1]
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['user_id']
except:
return jsonify({'error': 'Token invalid'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route('/auth/login', methods=['POST'])
def login():
"""Login y generar JWT"""
data = request.json
# Validar credenciales (simplificado)
if data.get('email') == 'user@example.com' and data.get('password') == 'password':
token = jwt.encode({
'user_id': 'user_123',
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['SECRET_KEY'])
return jsonify({'token': token}), 200
return jsonify({'error': 'Invalid credentials'}), 401
# ===== RESOURCE ENDPOINTS =====
# Simulated database
products = {
'1': {'id': '1', 'name': 'Laptop', 'price': 999.99, 'stock': 10},
'2': {'id': '2', 'name': 'Mouse', 'price': 29.99, 'stock': 50}
}
@app.route('/api/v1/products', methods=['GET'])
def get_products():
"""
GET /api/v1/products?page=1&limit=10&sort=price&order=desc&category=electronics
Query params:
- page: pagination
- limit: items per page
- sort: field to sort by
- order: asc or desc
- filters: category, price_min, price_max, etc.
"""
# Pagination
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
# Filtering
category = request.args.get('category')
price_min = request.args.get('price_min', type=float)
price_max = request.args.get('price_max', type=float)
# Sorting
sort_by = request.args.get('sort', 'id')
order = request.args.get('order', 'asc')
# Apply filters
filtered_products = list(products.values())
if price_min:
filtered_products = [p for p in filtered_products if p['price'] >= price_min]
if price_max:
filtered_products = [p for p in filtered_products if p['price'] <= price_max]
# Apply sorting
filtered_products.sort(
key=lambda x: x.get(sort_by),
reverse=(order == 'desc')
)
# Pagination
start = (page - 1) * limit
end = start + limit
paginated = filtered_products[start:end]
# Response with metadata
return jsonify({
'data': paginated,
'metadata': {
'page': page,
'limit': limit,
'total': len(filtered_products),
'total_pages': (len(filtered_products) + limit - 1) // limit
},
'links': {
'self': f'/api/v1/products?page={page}&limit={limit}',
'next': f'/api/v1/products?page={page+1}&limit={limit}' if end < len(filtered_products) else None,
'prev': f'/api/v1/products?page={page-1}&limit={limit}' if page > 1 else None
}
}), 200
@app.route('/api/v1/products/<product_id>', methods=['GET'])
def get_product(product_id):
"""GET individual product"""
product = products.get(product_id)
if not product:
return jsonify({'error': 'Product not found'}), 404
return jsonify(product), 200
@app.route('/api/v1/products', methods=['POST'])
@token_required
def create_product(current_user):
"""
POST /api/v1/products
Body:
{
"name": "Keyboard",
"price": 79.99,
"stock": 30
}
"""
data = request.json
# Validation
if not data.get('name') or not data.get('price'):
return jsonify({'error': 'Missing required fields'}), 400
# Create product
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': data['name'],
'price': data['price'],
'stock': data.get('stock', 0)
}
products[product_id] = product
# Return 201 Created with Location header
return jsonify(product), 201, {'Location': f'/api/v1/products/{product_id}'}
@app.route('/api/v1/products/<product_id>', methods=['PUT'])
@token_required
def update_product(current_user, product_id):
"""PUT (replace entire resource)"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
data = request.json
products[product_id] = {
'id': product_id,
'name': data['name'],
'price': data['price'],
'stock': data.get('stock', 0)
}
return jsonify(products[product_id]), 200
@app.route('/api/v1/products/<product_id>', methods=['PATCH'])
@token_required
def patch_product(current_user, product_id):
"""PATCH (partial update)"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
data = request.json
product = products[product_id]
# Update only provided fields
if 'name' in data:
product['name'] = data['name']
if 'price' in data:
product['price'] = data['price']
if 'stock' in data:
product['stock'] = data['stock']
return jsonify(product), 200
@app.route('/api/v1/products/<product_id>', methods=['DELETE'])
@token_required
def delete_product(current_user, product_id):
"""DELETE product"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
del products[product_id]
return '', 204 # No content
# ===== ERROR HANDLING =====
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Not Found',
'message': 'The requested resource was not found'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
}), 500
# ===== RATE LIMITING =====
from functools import wraps
from flask import g
import time
def rate_limit(max_requests=10, window=60):
"""Simple in-memory rate limiter"""
requests = {}
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
client_ip = request.remote_addr
current_time = time.time()
# Clean old requests
if client_ip in requests:
requests[client_ip] = [
t for t in requests[client_ip]
if current_time - t < window
]
else:
requests[client_ip] = []
# Check limit
if len(requests[client_ip]) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429
# Add request
requests[client_ip].append(current_time)
# Set headers
response = f(*args, **kwargs)
if isinstance(response, tuple):
resp, status = response[0], response[1]
else:
resp, status = response, 200
# Return with rate limit headers
return resp, status, {
'X-RateLimit-Limit': max_requests,
'X-RateLimit-Remaining': max_requests - len(requests[client_ip]),
'X-RateLimit-Reset': int(current_time + window)
}
return decorated
return decorator
@app.route('/api/v1/limited', methods=['GET'])
@rate_limit(max_requests=5, window=60)
def limited_endpoint():
return jsonify({'message': 'This endpoint is rate limited'}), 200
GraphQL API:
import graphene
from graphene import ObjectType, String, Float, Int, List, Field, Schema
# ===== TYPES =====
class ProductType(ObjectType):
id = String()
name = String()
price = Float()
stock = Int()
class OrderItemType(ObjectType):
product = Field(ProductType)
quantity = Int()
subtotal = Float()
class OrderType(ObjectType):
id = String()
user_id = String()
items = List(OrderItemType)
total = Float()
status = String()
# ===== QUERIES =====
class Query(ObjectType):
# Single product
product = Field(
ProductType,
id=String(required=True),
description="Get product by ID"
)
# List products with filtering
products = List(
ProductType,
category=String(),
price_min=Float(),
price_max=Float(),
limit=Int(default_value=10)
)
# Order with nested products
order = Field(
OrderType,
id=String(required=True)
)
def resolve_product(self, info, id):
"""Resolver para single product"""
return products.get(id)
def resolve_products(self, info, category=None, price_min=None, price_max=None, limit=10):
"""Resolver para list products"""
result = list(products.values())
if price_min:
result = [p for p in result if p['price'] >= price_min]
if price_max:
result = [p for p in result if p['price'] <= price_max]
return result[:limit]
def resolve_order(self, info, id):
"""Resolver para order (with N+1 problem solution)"""
# En producción: usar DataLoader para batching
order = orders.get(id)
if not order:
return None
# Cargar productos de los items
enriched_items = []
for item in order['items']:
product = products.get(item['product_id'])
enriched_items.append({
'product': product,
'quantity': item['quantity'],
'subtotal': product['price'] * item['quantity']
})
return {
**order,
'items': enriched_items
}
# ===== MUTATIONS =====
class CreateProduct(graphene.Mutation):
class Arguments:
name = String(required=True)
price = Float(required=True)
stock = Int(default_value=0)
product = Field(ProductType)
def mutate(self, info, name, price, stock=0):
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': name,
'price': price,
'stock': stock
}
products[product_id] = product
return CreateProduct(product=product)
class UpdateProduct(graphene.Mutation):
class Arguments:
id = String(required=True)
name = String()
price = Float()
stock = Int()
product = Field(ProductType)
def mutate(self, info, id, name=None, price=None, stock=None):
if id not in products:
raise Exception('Product not found')
product = products[id]
if name:
product['name'] = name
if price:
product['price'] = price
if stock is not None:
product['stock'] = stock
return UpdateProduct(product=product)
class Mutation(ObjectType):
create_product = CreateProduct.Field()
update_product = UpdateProduct.Field()
# ===== SCHEMA =====
schema = Schema(query=Query, mutation=Mutation)
# ===== FLASK INTEGRATION =====
from flask import Flask
from flask_graphql import GraphQLView
app = Flask(__name__)
app.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view(
'graphql',
schema=schema,
graphiql=True # GraphiQL IDE
)
)
# Ejemplo de query GraphQL:
"""
query GetProducts {
products(priceMin: 20, priceMax: 100, limit: 5) {
id
name
price
stock
}
}
query GetOrder {
order(id: "1") {
id
total
items {
product {
name
price
}
quantity
subtotal
}
}
}
mutation CreateProduct {
createProduct(name: "Monitor", price: 299.99, stock: 15) {
product {
id
name
price
}
}
}
"""
gRPC con Protocol Buffers:
// products.proto
syntax = "proto3";
package products;
service ProductService {
rpc GetProduct (ProductRequest) returns (Product);
rpc ListProducts (ListProductsRequest) returns (ProductList);
rpc CreateProduct (CreateProductRequest) returns (Product);
rpc StreamOrders (StreamOrdersRequest) returns (stream Order);
}
message Product {
string id = 1;
string name = 2;
double price = 3;
int32 stock = 4;
}
message ProductRequest {
string id = 1;
}
message ListProductsRequest {
int32 page = 1;
int32 limit = 2;
double price_min = 3;
double price_max = 4;
}
message ProductList {
repeated Product products = 1;
int32 total = 2;
}
message CreateProductRequest {
string name = 1;
double price = 2;
int32 stock = 3;
}
message Order {
string id = 1;
string user_id = 2;
double total = 3;
string status = 4;
}
message StreamOrdersRequest {
string user_id = 1;
}
# Generar código Python: python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. products.proto
import grpc
from concurrent import futures
import products_pb2
import products_pb2_grpc
import time
# ===== SERVER =====
class ProductServicer(products_pb2_grpc.ProductServiceServicer):
"""Implementación del servicio gRPC"""
def GetProduct(self, request, context):
"""Unary RPC: 1 request → 1 response"""
product_id = request.id
if product_id not in products:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details('Product not found')
return products_pb2.Product()
p = products[product_id]
return products_pb2.Product(
id=p['id'],
name=p['name'],
price=p['price'],
stock=p['stock']
)
def ListProducts(self, request, context):
"""Unary RPC con filtros"""
filtered = list(products.values())
if request.price_min > 0:
filtered = [p for p in filtered if p['price'] >= request.price_min]
if request.price_max > 0:
filtered = [p for p in filtered if p['price'] <= request.price_max]
# Pagination
start = (request.page - 1) * request.limit
end = start + request.limit
paginated = filtered[start:end]
product_messages = [
products_pb2.Product(
id=p['id'],
name=p['name'],
price=p['price'],
stock=p['stock']
)
for p in paginated
]
return products_pb2.ProductList(
products=product_messages,
total=len(filtered)
)
def CreateProduct(self, request, context):
"""Unary RPC para crear producto"""
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': request.name,
'price': request.price,
'stock': request.stock
}
products[product_id] = product
return products_pb2.Product(
id=product_id,
name=request.name,
price=request.price,
stock=request.stock
)
def StreamOrders(self, request, context):
"""Server-streaming RPC: 1 request → N responses"""
user_id = request.user_id
# Simular stream de órdenes
for i in range(5):
time.sleep(1) # Simular delay
yield products_pb2.Order(
id=f"order_{i}",
user_id=user_id,
total=99.99 * (i + 1),
status='processing'
)
def serve():
"""Iniciar servidor gRPC"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
products_pb2_grpc.add_ProductServiceServicer_to_server(
ProductServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
# ===== CLIENT =====
def run_client():
"""Cliente gRPC"""
channel = grpc.insecure_channel('localhost:50051')
stub = products_pb2_grpc.ProductServiceStub(channel)
# GetProduct
response = stub.GetProduct(products_pb2.ProductRequest(id='1'))
print(f"Product: {response.name}, ${response.price}")
# ListProducts
response = stub.ListProducts(products_pb2.ListProductsRequest(
page=1,
limit=10,
price_min=20.0,
price_max=100.0
))
print(f"Found {response.total} products:")
for product in response.products:
print(f" - {product.name}: ${product.price}")
# CreateProduct
new_product = stub.CreateProduct(products_pb2.CreateProductRequest(
name='Monitor',
price=299.99,
stock=15
))
print(f"Created product: {new_product.id}")
# StreamOrders (server streaming)
print("\nStreaming orders:")
for order in stub.StreamOrders(products_pb2.StreamOrdersRequest(user_id='user_123')):
print(f" Order {order.id}: ${order.total} - {order.status}")
if __name__ == '__main__':
# Run server in background, then client
import threading
server_thread = threading.Thread(target=serve, daemon=True)
server_thread.start()
time.sleep(2)
run_client()
Cuándo usar cada uno:
REST:
- ✅ APIs públicas
- ✅ CRUD simple
- ✅ Caching HTTP importante
- ✅ Team sin experiencia con GraphQL/gRPC
GraphQL:
- ✅ Frontend necesita flexibilidad en queries
- ✅ Múltiples clientes con necesidades diferentes
- ✅ Evitar over/under-fetching
- ❌ APIs públicas simples (REST es más simple)
gRPC:
- ✅ Microservices internos
- ✅ Performance crítico
- ✅ Streaming bidireccional
- ✅ Polyglot environments (multi-lenguaje)
- ❌ Browsers (limitado, necesita gRPC-Web)
Receta 2.9: Resiliencia - Circuit Breakers y Retry
Circuit Breaker Pattern:
from enum import Enum
from datetime import datetime, timedelta
import time
import random
class CircuitState(Enum):
CLOSED = "closed" # Normal: requests pasan
OPEN = "open" # Fallando: rechazar requests
HALF_OPEN = "half_open" # Testing: permitir algunos requests
class CircuitBreaker:
"""
Circuit Breaker para proteger contra fallos en cascada
Estados:
- CLOSED: Funcionando normal
- OPEN: Demasiados fallos, rechazar requests
- HALF_OPEN: Intentar recuperación gradual
"""
def __init__(
self,
failure_threshold=5, # Fallos antes de abrir
timeout=60, # Segundos en OPEN
success_threshold=2 # Éxitos para cerrar desde HALF_OPEN
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Ejecutar función con circuit breaker"""
# Si está OPEN, verificar si es tiempo de intentar HALF_OPEN
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("🔄 Circuit breaker: OPEN → HALF_OPEN (attempting recovery)")
else:
raise Exception(f"Circuit breaker is OPEN (retry in {self._time_until_retry()}s)")
try:
# Intentar ejecutar función
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Manejar llamada exitosa"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
print(f"✓ Success in HALF_OPEN ({self.success_count}/{self.success_threshold})")
if self.success_count >= self.success_threshold:
self._reset()
else:
# En CLOSED, reset failure count
self.failure_count = 0
def _on_failure(self):
"""Manejar llamada fallida"""
self.failure_count += 1
self.last_failure_time = datetime.now()
print(f"✗ Failure ({self.failure_count}/{self.failure_threshold})")
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"⚠️ Circuit breaker: CLOSED → OPEN (too many failures)")
# Si falla en HALF_OPEN, volver a OPEN
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.success_count = 0
print("⚠️ Circuit breaker: HALF_OPEN → OPEN (failure during recovery)")
def _reset(self):
"""Resetear circuit breaker"""
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
print("✓ Circuit breaker: HALF_OPEN → CLOSED (recovered)")
def _should_attempt_reset(self):
"""Verificar si es tiempo de intentar recovery"""
if not self.last_failure_time:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.timeout
def _time_until_retry(self):
"""Tiempo restante hasta intentar recovery"""
if not self.last_failure_time:
return 0
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return max(0, self.timeout - elapsed)
# ===== RETRY CON EXPONENTIAL BACKOFF =====
def retry_with_backoff(
func,
max_retries=3,
initial_delay=1,
max_delay=60,
backoff_factor=2,
exceptions=(Exception,)
):
"""
Retry con exponential backoff y jitter
Delays: 1s, 2s, 4s, 8s, ...
Jitter: Agregar randomness para evitar thundering herd
"""
delay = initial_delay
for attempt in range(max_retries + 1):
try:
return func()
except exceptions as e:
if attempt == max_retries:
print(f"❌ Failed after {max_retries + 1} attempts")
raise
# Calcular delay con exponential backoff + jitter
delay = min(delay * backoff_factor, max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% jitter
sleep_time = delay + jitter
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f" Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
# ===== DEMOSTRACIÓN =====
def unreliable_service():
"""Simular servicio que falla aleatoriamente"""
if random.random() < 0.7: # 70% chance de fallo
raise Exception("Service unavailable")
return "Success!"
print("=== Circuit Breaker Demo ===\n")
cb = CircuitBreaker(
failure_threshold=3,
timeout=5,
success_threshold=2
)
# Hacer múltiples llamadas
for i in range(20):
try:
result = cb.call(unreliable_service)
print(f"✓ Request {i+1}: {result}\n")
except Exception as e:
print(f"✗ Request {i+1}: {e}\n")
time.sleep(1)
print("\n=== Retry with Backoff Demo ===\n")
attempt_count = [0]
def flaky_service():
"""Servicio que falla las primeras 2 veces"""
attempt_count[0] += 1
if attempt_count[0] < 3:
raise Exception(f"Temporary failure (attempt {attempt_count[0]})")
return "Success after retries!"
try:
result = retry_with_backoff(
flaky_service,
max_retries=5,
initial_delay=0.5,
backoff_factor=2
)
print(f"\n✓ Final result: {result}")
except Exception as e:
print(f"\n❌ Final error: {e}")
Timeout y Fallback:
import signal
from contextlib import contextmanager
class TimeoutError(Exception):
pass
@contextmanager
def timeout(seconds):
"""Context manager para timeout"""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds}s")
# Set signal handler
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def call_with_fallback(primary_func, fallback_func, timeout_seconds=5):
"""Llamar función con fallback si falla o timeout"""
try:
with timeout(timeout_seconds):
return primary_func()
except (Exception, TimeoutError) as e:
print(f"⚠️ Primary failed: {e}, using fallback")
return fallback_func()
# Uso
def slow_external_api():
time.sleep(10) # Muy lento
return {"data": "from API"}
def cache_fallback():
return {"data": "from cache (stale)"}
result = call_with_fallback(
slow_external_api,
cache_fallback,
timeout_seconds=2
)
print(f"Result: {result}")
¡Felicidades! 🎉
Has completado la FASE 2: Arquitectura y Diseño de Sistemas del roadmap.
Lo que has aprendido:
✅ Monolitos vs Microservicios
✅ Event-Driven Architecture y CQRS
✅ CAP Theorem y consistencia eventual
✅ Load Balancing y caching strategies
✅ SQL avanzado y optimización
✅ NoSQL (MongoDB, Redis)
✅ REST vs GraphQL vs gRPC
✅ Resiliencia con Circuit Breakers
Próximos pasos:
FASE 3: Infraestructura y DevOps
- Contenedores y Kubernetes
- Cloud Computing
- CI/CD Pipelines
- Infraestructura como Código
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo