🏗️ Developer Cookbook - FASE 2: Arquitectura y Diseño de Sistemas

Recetas prácticas para diseñar sistemas escalables y robustos


📚 Tabla de Contenidos

  1. Diseño de Software Escalable
  2. Bases de Datos
  3. APIs y Comunicación entre Servicios

Diseño de Software Escalable

Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?

¿Qué son?

Comparación:

CaracterísticaMonolitoMicroservicios
Complejidad inicial🟢 Baja🔴 Alta
DeploymentSimple (1 deploy)Complejo (N deploys)
EscalabilidadVertical solamenteHorizontal por servicio
Testing🟢 Fácil E2E🔴 Difícil E2E
Desarrollo inicial🟢 Rápido🔴 Lento
GestiónSimpleRequiere DevOps/SRE
Debugging🟢 Fácil🔴 Trazas distribuidas
TecnologíaStack únicoPoliglota

Caso de uso: eCommerce

# ===== MONOLITO =====
from flask import Flask, request, jsonify
from datetime import datetime
import sqlite3

app = Flask(__name__)

class MonolithEcommerce:
    """Todo en una aplicación"""
    
    def __init__(self):
        self.db = sqlite3.connect('ecommerce.db', check_same_thread=False)
        self._init_db()
    
    def _init_db(self):
        """Todas las tablas en una DB"""
        cursor = self.db.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                email TEXT,
                name TEXT
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY,
                name TEXT,
                price REAL,
                stock INTEGER
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY,
                user_id INTEGER,
                total REAL,
                status TEXT,
                created_at TEXT
            )
        ''')
        self.db.commit()
    
    def create_order(self, user_id, product_ids):
        """Todo el flujo de orden en un lugar"""
        cursor = self.db.cursor()
        
        # 1. Validar usuario
        cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
        user = cursor.fetchone()
        if not user:
            return {'error': 'User not found'}, 404
        
        # 2. Calcular total y validar stock
        total = 0
        for product_id in product_ids:
            cursor.execute('SELECT price, stock FROM products WHERE id = ?', (product_id,))
            product = cursor.fetchone()
            if not product or product[1] < 1:
                return {'error': f'Product {product_id} unavailable'}, 400
            total += product[0]
        
        # 3. Crear orden
        cursor.execute('''
            INSERT INTO orders (user_id, total, status, created_at)
            VALUES (?, ?, ?, ?)
        ''', (user_id, total, 'pending', datetime.now().isoformat()))
        
        # 4. Reducir stock
        for product_id in product_ids:
            cursor.execute('''
                UPDATE products SET stock = stock - 1 WHERE id = ?
            ''', (product_id,))
        
        # 5. Enviar email (simulado)
        self._send_order_email(user[1], total)
        
        self.db.commit()
        return {'order_id': cursor.lastrowid, 'total': total}, 201
    
    def _send_order_email(self, email, total):
        """Email integrado en el monolito"""
        print(f"📧 Sending order confirmation to {email} for ${total}")

monolith = MonolithEcommerce()

@app.route('/orders', methods=['POST'])
def create_order():
    data = request.json
    result, status = monolith.create_order(
        data['user_id'],
        data['product_ids']
    )
    return jsonify(result), status

# ===== MICROSERVICIOS =====
import requests
from flask import Flask

# Servicio 1: Users
user_service = Flask('users')
users_db = {}

@user_service.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = users_db.get(user_id)
    if not user:
        return jsonify({'error': 'Not found'}), 404
    return jsonify(user), 200

# Servicio 2: Products/Inventory
product_service = Flask('products')
products_db = {
    1: {'name': 'Laptop', 'price': 999.99, 'stock': 10},
    2: {'name': 'Mouse', 'price': 29.99, 'stock': 50}
}

@product_service.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
    product = products_db.get(product_id)
    if not product:
        return jsonify({'error': 'Not found'}), 404
    return jsonify(product), 200

@product_service.route('/products/<int:product_id>/reserve', methods=['POST'])
def reserve_stock(product_id):
    """Reservar stock (inventory management)"""
    product = products_db.get(product_id)
    if not product or product['stock'] < 1:
        return jsonify({'error': 'Out of stock'}), 400
    
    product['stock'] -= 1
    return jsonify({'reserved': True}), 200

# Servicio 3: Orders (Orquestador)
order_service = Flask('orders')
orders_db = {}

class OrderOrchestrator:
    """Coordina llamadas entre microservicios"""
    
    USER_SERVICE_URL = "http://localhost:5001"
    PRODUCT_SERVICE_URL = "http://localhost:5002"
    NOTIFICATION_SERVICE_URL = "http://localhost:5003"
    
    def create_order(self, user_id, product_ids):
        """Flujo distribuido con compensaciones"""
        try:
            # 1. Validar usuario (llamada a user-service)
            user_response = requests.get(
                f"{self.USER_SERVICE_URL}/users/{user_id}",
                timeout=2
            )
            if user_response.status_code != 200:
                return {'error': 'Invalid user'}, 400
            
            user = user_response.json()
            
            # 2. Reservar productos (llamadas a product-service)
            total = 0
            reserved_products = []
            
            for product_id in product_ids:
                # Obtener producto
                product_response = requests.get(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}",
                    timeout=2
                )
                if product_response.status_code != 200:
                    # Compensar: liberar productos ya reservados
                    self._rollback_reservations(reserved_products)
                    return {'error': f'Product {product_id} not found'}, 400
                
                product = product_response.json()
                total += product['price']
                
                # Reservar stock
                reserve_response = requests.post(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/reserve",
                    timeout=2
                )
                if reserve_response.status_code != 200:
                    # Compensar
                    self._rollback_reservations(reserved_products)
                    return {'error': f'Product {product_id} out of stock'}, 400
                
                reserved_products.append(product_id)
            
            # 3. Crear orden
            order_id = len(orders_db) + 1
            orders_db[order_id] = {
                'id': order_id,
                'user_id': user_id,
                'products': product_ids,
                'total': total,
                'status': 'confirmed',
                'created_at': datetime.now().isoformat()
            }
            
            # 4. Notificar (async, fire-and-forget)
            try:
                requests.post(
                    f"{self.NOTIFICATION_SERVICE_URL}/send",
                    json={
                        'email': user['email'],
                        'template': 'order_confirmation',
                        'data': {'order_id': order_id, 'total': total}
                    },
                    timeout=1
                )
            except:
                # No fallar la orden si falla el email
                print("⚠️  Notification failed, but order created")
            
            return {'order_id': order_id, 'total': total}, 201
        
        except requests.Timeout:
            return {'error': 'Service timeout'}, 504
        except requests.ConnectionError:
            return {'error': 'Service unavailable'}, 503
    
    def _rollback_reservations(self, product_ids):
        """Saga pattern: Compensar transacciones"""
        for product_id in product_ids:
            try:
                requests.post(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/release",
                    timeout=1
                )
            except:
                print(f"⚠️  Failed to rollback product {product_id}")

orchestrator = OrderOrchestrator()

@order_service.route('/orders', methods=['POST'])
def create_order_micro():
    data = request.json
    result, status = orchestrator.create_order(
        data['user_id'],
        data['product_ids']
    )
    return jsonify(result), status

# Servicio 4: Notifications (async)
notification_service = Flask('notifications')

@notification_service.route('/send', methods=['POST'])
def send_notification():
    """Servicio independiente de notificaciones"""
    data = request.json
    print(f"📧 Sending {data['template']} to {data['email']}")
    # Aquí iría integración con SendGrid, SES, etc.
    return jsonify({'sent': True}), 200

Cuándo usar Monolito:

Cuándo usar Microservicios:

⚠️ La regla de oro:

“Start with a monolith, evolve to microservices when pain points are clear”
— Martin Fowler


Receta 2.2: Event-Driven Architecture

¿Qué es? Arquitectura donde componentes se comunican mediante eventos (mensajes) en lugar de llamadas síncronas directas.

Ventajas:

Componentes clave:

from dataclasses import dataclass
from datetime import datetime
from typing import List, Callable
import json
from queue import Queue
import threading

# ===== Event Bus =====
@dataclass
class Event:
    """Evento inmutable"""
    event_type: str
    aggregate_id: str
    data: dict
    timestamp: datetime
    metadata: dict = None
    
    def to_json(self):
        return {
            'event_type': self.event_type,
            'aggregate_id': self.aggregate_id,
            'data': self.data,
            'timestamp': self.timestamp.isoformat(),
            'metadata': self.metadata or {}
        }

class EventBus:
    """Bus de eventos in-memory (en producción: Kafka, RabbitMQ, SQS)"""
    
    def __init__(self):
        self.subscribers = {}  # {event_type: [handlers]}
        self.event_log = []    # Store de todos los eventos
    
    def publish(self, event: Event):
        """Publicar evento"""
        print(f"📢 Published: {event.event_type}")
        self.event_log.append(event)
        
        # Notificar a suscriptores
        handlers = self.subscribers.get(event.event_type, [])
        for handler in handlers:
            try:
                # En producción: esto iría a una cola (async)
                threading.Thread(target=handler, args=(event,)).start()
            except Exception as e:
                print(f"❌ Handler error: {e}")
    
    def subscribe(self, event_type: str, handler: Callable):
        """Suscribirse a tipo de evento"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        print(f"✓ Subscribed to {event_type}")
    
    def replay_events(self, aggregate_id: str):
        """Reconstruir estado desde eventos (Event Sourcing)"""
        return [e for e in self.event_log if e.aggregate_id == aggregate_id]

# ===== Ejemplo: Sistema de pedidos =====
event_bus = EventBus()

# Servicio 1: Order Service (publica eventos)
class OrderService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.orders = {}
    
    def create_order(self, order_id: str, user_id: str, items: List[dict]):
        """Crear orden y publicar evento"""
        order = {
            'id': order_id,
            'user_id': user_id,
            'items': items,
            'status': 'created',
            'total': sum(item['price'] * item['qty'] for item in items)
        }
        self.orders[order_id] = order
        
        # Publicar evento (no llamar otros servicios directamente)
        self.event_bus.publish(Event(
            event_type='OrderCreated',
            aggregate_id=order_id,
            data=order,
            timestamp=datetime.now(),
            metadata={'user_id': user_id}
        ))
        
        return order
    
    def confirm_payment(self, order_id: str):
        """Confirmar pago"""
        order = self.orders[order_id]
        order['status'] = 'paid'
        
        self.event_bus.publish(Event(
            event_type='OrderPaid',
            aggregate_id=order_id,
            data={'order_id': order_id, 'total': order['total']},
            timestamp=datetime.now()
        ))

# Servicio 2: Inventory Service (reacciona a eventos)
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.inventory = {
            'laptop': 10,
            'mouse': 50
        }
        
        # Suscribirse a eventos relevantes
        event_bus.subscribe('OrderCreated', self.handle_order_created)
    
    def handle_order_created(self, event: Event):
        """Reservar stock cuando se crea orden"""
        print(f"\n🏪 InventoryService processing {event.event_type}")
        
        items = event.data['items']
        order_id = event.aggregate_id
        
        # Validar y reservar stock
        for item in items:
            product = item['product']
            qty = item['qty']
            
            if self.inventory.get(product, 0) < qty:
                # Publicar evento de fallo
                event_bus.publish(Event(
                    event_type='InventoryReservationFailed',
                    aggregate_id=order_id,
                    data={'product': product, 'requested': qty},
                    timestamp=datetime.now()
                ))
                return
            
            self.inventory[product] -= qty
        
        # Éxito: publicar evento
        event_bus.publish(Event(
            event_type='InventoryReserved',
            aggregate_id=order_id,
            data={'items': items},
            timestamp=datetime.now()
        ))

# Servicio 3: Payment Service
class PaymentService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe('InventoryReserved', self.handle_inventory_reserved)
    
    def handle_inventory_reserved(self, event: Event):
        """Procesar pago cuando stock está reservado"""
        print(f"\n💳 PaymentService processing {event.event_type}")
        
        order_id = event.aggregate_id
        
        # Simular procesamiento de pago
        success = True  # En realidad llamaría a Stripe, etc.
        
        if success:
            event_bus.publish(Event(
                event_type='PaymentSucceeded',
                aggregate_id=order_id,
                data={'payment_id': 'pay_123'},
                timestamp=datetime.now()
            ))
        else:
            event_bus.publish(Event(
                event_type='PaymentFailed',
                aggregate_id=order_id,
                data={'reason': 'Insufficient funds'},
                timestamp=datetime.now()
            ))

# Servicio 4: Notification Service
class NotificationService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe('OrderCreated', self.send_order_confirmation)
        event_bus.subscribe('PaymentSucceeded', self.send_payment_confirmation)
        event_bus.subscribe('InventoryReservationFailed', self.send_failure_notification)
    
    def send_order_confirmation(self, event: Event):
        print(f"\n📧 Sending order confirmation for {event.aggregate_id}")
    
    def send_payment_confirmation(self, event: Event):
        print(f"\n📧 Sending payment confirmation for {event.aggregate_id}")
    
    def send_failure_notification(self, event: Event):
        print(f"\n📧 Sending failure notification for {event.aggregate_id}")

# ===== Demostración =====
print("=== Event-Driven Architecture Demo ===\n")

# Inicializar servicios
order_service = OrderService(event_bus)
inventory_service = InventoryService(event_bus)
payment_service = PaymentService(event_bus)
notification_service = NotificationService(event_bus)

# Crear orden (esto desencadena toda la cadena)
import time
order = order_service.create_order(
    order_id='order_001',
    user_id='user_123',
    items=[
        {'product': 'laptop', 'price': 999, 'qty': 1},
        {'product': 'mouse', 'price': 29, 'qty': 2}
    ]
)

# Esperar a que se procesen los eventos (async)
time.sleep(0.5)

print("\n=== Event Log ===")
for event in event_bus.event_log:
    print(f"- {event.event_type} @ {event.timestamp.strftime('%H:%M:%S')}")

# Reconstruir estado desde eventos (Event Sourcing)
print("\n=== Replay Events for order_001 ===")
events = event_bus.replay_events('order_001')
for event in events:
    print(f"- {event.event_type}: {event.data}")

Patrones de mensajería:

# 1. PUBLISH-SUBSCRIBE (1:N)
# Un evento, múltiples consumidores
event_bus.subscribe('UserRegistered', send_welcome_email)
event_bus.subscribe('UserRegistered', create_default_preferences)
event_bus.subscribe('UserRegistered', send_to_analytics)

# 2. POINT-TO-POINT (1:1)
# Task queue: Un mensaje, un consumidor
task_queue = Queue()
task_queue.put({'task': 'send_email', 'to': 'user@example.com'})

# Workers compiten por tareas
def worker():
    while True:
        task = task_queue.get()
        process_task(task)
        task_queue.task_done()

# 3. REQUEST-REPLY (síncrono simulado)
# Cliente espera respuesta
def request_with_correlation():
    correlation_id = 'req_123'
    
    # Publicar request
    event_bus.publish(Event(
        event_type='GetUserRequest',
        aggregate_id=correlation_id,
        data={'user_id': '456'},
        timestamp=datetime.now()
    ))
    
    # Esperar respuesta (normalmente con timeout)
    # En producción: usar reply queue

Tecnologías para Event-Driven:


Receta 2.3: CQRS y Event Sourcing

¿Qué es CQRS? Command Query Responsibility Segregation: Separar modelos de escritura (Commands) y lectura (Queries).

¿Qué es Event Sourcing? Almacenar cambios de estado como secuencia de eventos en lugar del estado actual.

Caso de uso: Sistema bancario

from typing import List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

# ===== EVENT SOURCING =====
class EventType(Enum):
    ACCOUNT_CREATED = "account_created"
    MONEY_DEPOSITED = "money_deposited"
    MONEY_WITHDRAWN = "money_withdrawn"
    ACCOUNT_FROZEN = "account_frozen"

@dataclass
class DomainEvent:
    """Evento de dominio inmutable"""
    event_id: str
    event_type: EventType
    aggregate_id: str  # Account ID
    data: dict
    timestamp: datetime = field(default_factory=datetime.now)
    version: int = 1

class EventStore:
    """Store de eventos (en producción: EventStoreDB, PostgreSQL)"""
    
    def __init__(self):
        self.events = []  # En producción: DB append-only
    
    def append(self, event: DomainEvent):
        """Agregar evento (inmutable, solo append)"""
        self.events.append(event)
        print(f"✓ Event stored: {event.event_type.value} v{event.version}")
    
    def get_events(self, aggregate_id: str) -> List[DomainEvent]:
        """Obtener todos los eventos de un agregado"""
        return [e for e in self.events if e.aggregate_id == aggregate_id]
    
    def get_events_from_version(self, aggregate_id: str, from_version: int):
        """Obtener eventos desde cierta versión (para snapshots)"""
        return [
            e for e in self.events 
            if e.aggregate_id == aggregate_id and e.version >= from_version
        ]

# ===== WRITE MODEL (Commands) =====
class AccountAggregate:
    """Agregado que reconstruye estado desde eventos"""
    
    def __init__(self, account_id: str):
        self.account_id = account_id
        self.balance = 0
        self.status = 'inactive'
        self.version = 0
        self.uncommitted_events = []
    
    def create_account(self, owner: str, initial_deposit: float):
        """Command: Crear cuenta"""
        if self.status != 'inactive':
            raise ValueError("Account already exists")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.ACCOUNT_CREATED,
            aggregate_id=self.account_id,
            data={'owner': owner, 'initial_deposit': initial_deposit},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def deposit(self, amount: float):
        """Command: Depositar"""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.MONEY_DEPOSITED,
            aggregate_id=self.account_id,
            data={'amount': amount},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def withdraw(self, amount: float):
        """Command: Retirar"""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        if self.status == 'frozen':
            raise ValueError("Account is frozen")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.MONEY_WITHDRAWN,
            aggregate_id=self.account_id,
            data={'amount': amount},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def _apply_event(self, event: DomainEvent):
        """Aplicar evento al estado (Event Sourcing)"""
        if event.event_type == EventType.ACCOUNT_CREATED:
            self.status = 'active'
            self.balance = event.data['initial_deposit']
        
        elif event.event_type == EventType.MONEY_DEPOSITED:
            self.balance += event.data['amount']
        
        elif event.event_type == EventType.MONEY_WITHDRAWN:
            self.balance -= event.data['amount']
        
        elif event.event_type == EventType.ACCOUNT_FROZEN:
            self.status = 'frozen'
        
        self.version = event.version
    
    def load_from_history(self, events: List[DomainEvent]):
        """Reconstruir estado desde eventos (Event Sourcing)"""
        for event in events:
            self._apply_event(event)
    
    def get_uncommitted_events(self):
        """Obtener eventos pendientes de guardar"""
        events = self.uncommitted_events
        self.uncommitted_events = []
        return events

# ===== READ MODEL (Queries) =====
class AccountReadModel:
    """Modelo optimizado para lecturas (CQRS)"""
    
    def __init__(self):
        # Diferentes proyecciones para diferentes queries
        self.accounts_by_id = {}
        self.accounts_by_balance = []  # Ordenado por balance
        self.transaction_history = {}  # {account_id: [transactions]}
    
    def project_event(self, event: DomainEvent):
        """Actualizar proyección cuando llega evento"""
        account_id = event.aggregate_id
        
        if event.event_type == EventType.ACCOUNT_CREATED:
            self.accounts_by_id[account_id] = {
                'id': account_id,
                'owner': event.data['owner'],
                'balance': event.data['initial_deposit'],
                'status': 'active',
                'created_at': event.timestamp
            }
            self.transaction_history[account_id] = []
        
        elif event.event_type == EventType.MONEY_DEPOSITED:
            self.accounts_by_id[account_id]['balance'] += event.data['amount']
            self.transaction_history[account_id].append({
                'type': 'deposit',
                'amount': event.data['amount'],
                'timestamp': event.timestamp,
                'balance_after': self.accounts_by_id[account_id]['balance']
            })
        
        elif event.event_type == EventType.MONEY_WITHDRAWN:
            self.accounts_by_id[account_id]['balance'] -= event.data['amount']
            self.transaction_history[account_id].append({
                'type': 'withdrawal',
                'amount': event.data['amount'],
                'timestamp': event.timestamp,
                'balance_after': self.accounts_by_id[account_id]['balance']
            })
        
        # Actualizar índice ordenado por balance
        self._rebuild_balance_index()
    
    def _rebuild_balance_index(self):
        """Reconstruir índice ordenado"""
        self.accounts_by_balance = sorted(
            self.accounts_by_id.values(),
            key=lambda x: x['balance'],
            reverse=True
        )
    
    def get_account(self, account_id: str):
        """Query: Obtener cuenta por ID - O(1)"""
        return self.accounts_by_id.get(account_id)
    
    def get_richest_accounts(self, limit: int = 10):
        """Query: Top cuentas por balance - O(1) por índice pre-calculado"""
        return self.accounts_by_balance[:limit]
    
    def get_transaction_history(self, account_id: str):
        """Query: Historial de transacciones - O(1)"""
        return self.transaction_history.get(account_id, [])

# ===== COMMAND HANDLER =====
class BankingCommandHandler:
    """Maneja commands y persiste eventos"""
    
    def __init__(self, event_store: EventStore, read_model: AccountReadModel):
        self.event_store = event_store
        self.read_model = read_model
    
    def handle_create_account(self, account_id: str, owner: str, initial_deposit: float):
        """Command Handler: Crear cuenta"""
        # 1. Cargar agregado desde eventos
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        # 2. Ejecutar command
        account.create_account(owner, initial_deposit)
        
        # 3. Persistir eventos
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            # 4. Actualizar read model (eventual consistency)
            self.read_model.project_event(event)
    
    def handle_deposit(self, account_id: str, amount: float):
        """Command Handler: Depositar"""
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        account.deposit(amount)
        
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            self.read_model.project_event(event)
    
    def handle_withdraw(self, account_id: str, amount: float):
        """Command Handler: Retirar"""
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        account.withdraw(amount)
        
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            self.read_model.project_event(event)

# ===== QUERY HANDLER =====
class BankingQueryHandler:
    """Maneja queries (solo lectura)"""
    
    def __init__(self, read_model: AccountReadModel):
        self.read_model = read_model
    
    def get_account_balance(self, account_id: str):
        """Query: Balance actual"""
        account = self.read_model.get_account(account_id)
        return account['balance'] if account else None
    
    def get_account_details(self, account_id: str):
        """Query: Detalles completos"""
        return self.read_model.get_account(account_id)
    
    def get_top_balances(self):
        """Query: Top 10 cuentas"""
        return self.read_model.get_richest_accounts(10)
    
    def get_transactions(self, account_id: str):
        """Query: Historial"""
        return self.read_model.get_transaction_history(account_id)

# ===== DEMOSTRACIÓN =====
print("=== CQRS + Event Sourcing Demo ===\n")

event_store = EventStore()
read_model = AccountReadModel()
command_handler = BankingCommandHandler(event_store, read_model)
query_handler = BankingQueryHandler(read_model)

# COMMANDS (Write)
print("--- COMMANDS ---")
command_handler.handle_create_account('ACC001', 'Alice', 1000.0)
command_handler.handle_deposit('ACC001', 500.0)
command_handler.handle_withdraw('ACC001', 200.0)
command_handler.handle_deposit('ACC001', 1000.0)

command_handler.handle_create_account('ACC002', 'Bob', 5000.0)
command_handler.handle_deposit('ACC002', 2000.0)

# QUERIES (Read)
print("\n--- QUERIES ---")
balance = query_handler.get_account_balance('ACC001')
print(f"Alice's balance: ${balance}")

details = query_handler.get_account_details('ACC001')
print(f"Alice's account: {details}")

top_accounts = query_handler.get_top_balances()
print(f"\nTop accounts:")
for acc in top_accounts:
    print(f"  {acc['owner']}: ${acc['balance']}")

transactions = query_handler.get_transactions('ACC001')
print(f"\nAlice's transactions:")
for tx in transactions:
    print(f"  {tx['type']}: ${tx['amount']} -> Balance: ${tx['balance_after']}")

# EVENT SOURCING: Reconstruir estado desde eventos
print("\n--- EVENT SOURCING: Rebuild from history ---")
new_account = AccountAggregate('ACC001')
events = event_store.get_events('ACC001')
new_account.load_from_history(events)
print(f"Rebuilt balance: ${new_account.balance} (version {new_account.version})")

print(f"\nTotal events stored: {len(event_store.events)}")

Ventajas de CQRS + Event Sourcing:

Desventajas:


Receta 2.4: CAP Theorem y Consistencia Eventual

CAP Theorem: En un sistema distribuido, solo puedes garantizar 2 de 3:

Trade-offs:

SistemaPriorizaEjemplo
CPConsistencia + ParticionesBancos tradicionales, Zookeeper
APDisponibilidad + ParticionesDNS, Cassandra, DynamoDB
CAConsistencia + DisponibilidadRDBMS single-node (no distribuido)

Implementación de consistencia eventual:

import time
import threading
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class WriteOperation:
    """Operación de escritura con timestamp"""
    key: str
    value: any
    timestamp: float
    node_id: str

class EventuallyConsistentStore:
    """Distributed key-value store con consistencia eventual (AP)"""
    
    def __init__(self, node_id: str, replication_factor: int = 3):
        self.node_id = node_id
        self.replication_factor = replication_factor
        self.data: Dict[str, any] = {}
        self.vector_clock: Dict[str, int] = {}  # Lamport clock
        self.write_log: List[WriteOperation] = []
        self.peer_nodes = []
        self.lock = threading.Lock()
    
    def connect_to_peer(self, peer_node):
        """Conectar a otro nodo (cluster)"""
        self.peer_nodes.append(peer_node)
        print(f"✓ {self.node_id} connected to {peer_node.node_id}")
    
    def write(self, key: str, value: any):
        """Escribir con consistencia eventual"""
        with self.lock:
            # 1. Escribir localmente (disponibilidad)
            self.data[key] = value
            self.vector_clock[key] = self.vector_clock.get(key, 0) + 1
            
            operation = WriteOperation(
                key=key,
                value=value,
                timestamp=time.time(),
                node_id=self.node_id
            )
            self.write_log.append(operation)
            
            print(f"✓ {self.node_id}: Wrote {key}={value} locally")
        
        # 2. Replicar asíncronamente (eventual consistency)
        self._replicate_async(operation)
        
        return True
    
    def _replicate_async(self, operation: WriteOperation):
        """Replicar a otros nodos asíncronamente"""
        def replicate():
            time.sleep(0.1)  # Simular network delay
            
            for peer in self.peer_nodes[:self.replication_factor - 1]:
                try:
                    peer._receive_replication(operation)
                except Exception as e:
                    print(f"⚠️  Replication failed to {peer.node_id}: {e}")
        
        threading.Thread(target=replicate, daemon=True).start()
    
    def _receive_replication(self, operation: WriteOperation):
        """Recibir replicación de otro nodo"""
        with self.lock:
            current_version = self.vector_clock.get(operation.key, 0)
            
            # Resolver conflictos: Last-Write-Wins usando timestamp
            if operation.key not in self.data or operation.timestamp > self.write_log[-1].timestamp:
                self.data[operation.key] = operation.value
                self.vector_clock[operation.key] = current_version + 1
                print(f"✓ {self.node_id}: Replicated {operation.key}={operation.value} from {operation.node_id}")
    
    def read(self, key: str, consistency_level='ONE'):
        """
        Leer con diferentes niveles de consistencia
        
        ONE: Leer de un nodo (más rápido, puede ser stale)
        QUORUM: Leer de mayoría (balance)
        ALL: Leer de todos (más consistente, más lento)
        """
        if consistency_level == 'ONE':
            # Lectura local (rápida pero eventual)
            value = self.data.get(key)
            print(f"🔍 {self.node_id}: Read {key}={value} (ONE)")
            return value
        
        elif consistency_level == 'QUORUM':
            # Leer de mayoría de nodos
            quorum_size = (len(self.peer_nodes) + 1) // 2 + 1
            responses = [self.data.get(key)]
            
            for peer in self.peer_nodes[:quorum_size - 1]:
                responses.append(peer.data.get(key))
            
            # Resolver: tomar valor más común
            value = max(set(responses), key=responses.count)
            print(f"🔍 {self.node_id}: Read {key}={value} (QUORUM)")
            return value
        
        elif consistency_level == 'ALL':
            # Leer de todos (strong consistency)
            responses = [self.data.get(key)]
            
            for peer in self.peer_nodes:
                responses.append(peer.data.get(key))
            
            # Verificar que todos tienen el mismo valor
            if len(set(responses)) > 1:
                print(f"⚠️  Inconsistency detected for {key}: {set(responses)}")
            
            value = responses[0]
            print(f"🔍 {self.node_id}: Read {key}={value} (ALL)")
            return value
    
    def get_stats(self):
        """Estadísticas del nodo"""
        return {
            'node_id': self.node_id,
            'keys': len(self.data),
            'writes': len(self.write_log),
            'data': dict(self.data)
        }

# ===== DEMOSTRACIÓN =====
print("=== Eventual Consistency Demo ===\n")

# Crear cluster de 3 nodos
node1 = EventuallyConsistentStore('Node1')
node2 = EventuallyConsistentStore('Node2')
node3 = EventuallyConsistentStore('Node3')

# Conectar nodos en cluster
node1.connect_to_peer(node2)
node1.connect_to_peer(node3)
node2.connect_to_peer(node1)
node2.connect_to_peer(node3)
node3.connect_to_peer(node1)
node3.connect_to_peer(node2)

print("\n--- Scenario 1: Normal Operation ---")
node1.write('user:123', {'name': 'Alice', 'age': 30})
time.sleep(0.3)  # Esperar replicación

# Leer desde diferentes nodos
node1.read('user:123', 'ONE')
node2.read('user:123', 'ONE')  # Eventualmente consistente
node3.read('user:123', 'ONE')

print("\n--- Scenario 2: Read Before Replication ---")
node1.write('user:456', {'name': 'Bob', 'age': 25})
node2.read('user:456', 'ONE')  # Puede no estar disponible aún
time.sleep(0.3)
node2.read('user:456', 'ONE')  # Ahora sí está replicado

print("\n--- Scenario 3: Conflicting Writes ---")
# Escrituras concurrentes en diferentes nodos
node1.write('user:789', {'name': 'Charlie', 'version': 1})
node2.write('user:789', {'name': 'Charlie Updated', 'version': 2})
time.sleep(0.5)  # Esperar convergencia

# Last-Write-Wins: el valor más reciente gana
print("\nAfter convergence:")
for node in [node1, node2, node3]:
    value = node.read('user:789', 'ONE')

print("\n--- Node Stats ---")
for node in [node1, node2, node3]:
    print(f"{node.node_id}: {node.get_stats()}")

Estrategias de resolución de conflictos:

class ConflictResolution:
    """Diferentes estrategias para resolver conflictos"""
    
    @staticmethod
    def last_write_wins(values_with_timestamps):
        """LWW: El valor con timestamp más reciente gana"""
        return max(values_with_timestamps, key=lambda x: x['timestamp'])
    
    @staticmethod
    def first_write_wins(values_with_timestamps):
        """FWW: El primer valor gana"""
        return min(values_with_timestamps, key=lambda x: x['timestamp'])
    
    @staticmethod
    def merge_sets(values):
        """Para sets: union de todos los valores"""
        result = set()
        for value in values:
            result.update(value)
        return result
    
    @staticmethod
    def custom_merge(values, merge_function):
        """Merge custom definido por la aplicación"""
        return merge_function(values)
    
    @staticmethod
    def vector_clock_resolution(values_with_clocks):
        """Usar vector clocks para detectar causalidad"""
        # Implementación simplificada
        for v1 in values_with_clocks:
            is_dominated = False
            for v2 in values_with_clocks:
                if v1 != v2 and all(v1['clock'][k] <= v2['clock'].get(k, 0) for k in v1['clock']):
                    is_dominated = True
                    break
            
            if not is_dominated:
                return v1  # Este valor no es dominado por ningún otro
        
        return values_with_clocks[0]  # Conflicto real, usar estrategia de fallback

# Ejemplo: Shopping cart con merge
class ShoppingCart:
    """Carrito de compras con consistencia eventual"""
    
    @staticmethod
    def merge_carts(cart1: set, cart2: set):
        """Merge de carritos: union de items"""
        # ADD WINS semántica: nunca perdemos items
        return cart1.union(cart2)

cart_node1 = {'item1', 'item2'}
cart_node2 = {'item2', 'item3'}
merged = ShoppingCart.merge_carts(cart_node1, cart_node2)
print(f"Merged cart: {merged}")  # {'item1', 'item2', 'item3'}

Cuándo usar consistencia eventual:

Cuándo NO usar:


Receta 2.5: Load Balancing y Caching

Load Balancing: Distribuir tráfico entre servidores

from typing import List
from dataclasses import dataclass
import hashlib
import random

@dataclass
class Server:
    """Servidor backend"""
    id: str
    host: str
    port: int
    weight: int = 1  # Para weighted round-robin
    health: str = 'healthy'  # healthy, unhealthy
    active_connections: int = 0

class LoadBalancer:
    """Load balancer con múltiples algoritmos"""
    
    def __init__(self, servers: List[Server]):
        self.servers = servers
        self.round_robin_index = 0
        self.request_count = 0
    
    def get_server_round_robin(self) -> Server:
        """Round Robin: Rotar entre servidores equitativamente"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        server = healthy_servers[self.round_robin_index % len(healthy_servers)]
        self.round_robin_index += 1
        return server
    
    def get_server_weighted_round_robin(self) -> Server:
        """Weighted Round Robin: Considerar capacidad del servidor"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        # Crear lista con repeticiones basadas en peso
        weighted_servers = []
        for server in healthy_servers:
            weighted_servers.extend([server] * server.weight)
        
        server = weighted_servers[self.round_robin_index % len(weighted_servers)]
        self.round_robin_index += 1
        return server
    
    def get_server_least_connections(self) -> Server:
        """Least Connections: Servidor con menos conexiones activas"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        return min(healthy_servers, key=lambda s: s.active_connections)
    
    def get_server_ip_hash(self, client_ip: str) -> Server:
        """IP Hash: Mismo cliente siempre al mismo servidor (sticky sessions)"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        # Hash consistente del IP
        hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
        index = hash_value % len(healthy_servers)
        return healthy_servers[index]
    
    def get_server_random(self) -> Server:
        """Random: Servidor aleatorio"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        return random.choice(healthy_servers)
    
    def health_check(self):
        """Health check de servidores"""
        for server in self.servers:
            # En producción: HTTP health check
            # Simulación: random failure
            if random.random() < 0.1:  # 10% chance de fallo
                server.health = 'unhealthy'
                print(f"⚠️  {server.id} is unhealthy")
            else:
                server.health = 'healthy'

# ===== CACHING STRATEGIES =====
import time
from collections import OrderedDict

class LRUCache:
    """Least Recently Used Cache"""
    
    def __init__(self, capacity: int):
        self.cache = OrderedDict()
        self.capacity = capacity
        self.hits = 0
        self.misses = 0
    
    def get(self, key: str):
        """Obtener del cache"""
        if key in self.cache:
            # Mover al final (más recientemente usado)
            self.cache.move_to_end(key)
            self.hits += 1
            return self.cache[key]
        
        self.misses += 1
        return None
    
    def put(self, key: str, value):
        """Agregar al cache"""
        if key in self.cache:
            self.cache.move_to_end(key)
        else:
            if len(self.cache) >= self.capacity:
                # Eliminar el menos recientemente usado (primero)
                self.cache.popitem(last=False)
        
        self.cache[key] = value
    
    def get_stats(self):
        """Estadísticas de cache"""
        total = self.hits + self.misses
        hit_rate = (self.hits / total * 100) if total > 0 else 0
        return {
            'hits': self.hits,
            'misses': self.misses,
            'hit_rate': f'{hit_rate:.1f}%',
            'size': len(self.cache)
        }

class CacheAside:
    """Cache-Aside pattern (Lazy Loading)"""
    
    def __init__(self, cache: LRUCache, database):
        self.cache = cache
        self.database = database
    
    def get(self, key: str):
        """
        1. Buscar en cache
        2. Si no está, buscar en DB
        3. Guardar en cache
        """
        # 1. Buscar en cache
        value = self.cache.get(key)
        if value is not None:
            print(f"✓ Cache HIT for {key}")
            return value
        
        # 2. Cache MISS: buscar en DB
        print(f"✗ Cache MISS for {key}, fetching from DB...")
        time.sleep(0.1)  # Simular latencia de DB
        value = self.database.get(key)
        
        # 3. Guardar en cache
        if value is not None:
            self.cache.put(key, value)
        
        return value
    
    def update(self, key: str, value):
        """
        Write-Through: Actualizar DB y cache
        """
        # 1. Actualizar DB
        self.database[key] = value
        
        # 2. Actualizar cache
        self.cache.put(key, value)

class WriteBackCache:
    """Write-Back pattern: escribir a DB asíncronamente"""
    
    def __init__(self, cache: LRUCache, database, flush_interval=5):
        self.cache = cache
        self.database = database
        self.dirty_keys = set()
        self.flush_interval = flush_interval
        
        # Background flusher
        import threading
        self.flusher_thread = threading.Thread(target=self._flush_periodically, daemon=True)
        self.flusher_thread.start()
    
    def write(self, key: str, value):
        """Escribir en cache (lazy DB write)"""
        self.cache.put(key, value)
        self.dirty_keys.add(key)
        print(f"✓ Wrote {key} to cache (DB write pending)")
    
    def _flush_periodically(self):
        """Flush cache a DB periódicamente"""
        while True:
            time.sleep(self.flush_interval)
            self._flush()
    
    def _flush(self):
        """Flush dirty keys a DB"""
        if self.dirty_keys:
            print(f"\n🔄 Flushing {len(self.dirty_keys)} keys to DB...")
            for key in list(self.dirty_keys):
                value = self.cache.get(key)
                if value is not None:
                    self.database[key] = value
            
            self.dirty_keys.clear()
            print("✓ Flush complete")

# ===== CACHE INVALIDATION =====
class CacheInvalidation:
    """Estrategias de invalidación de cache"""
    
    @staticmethod
    def ttl_based(cache_entry, ttl_seconds=300):
        """Time-To-Live: invalidar después de N segundos"""
        age = time.time() - cache_entry['timestamp']
        return age > ttl_seconds
    
    @staticmethod
    def event_based(cache, event_type, affected_keys):
        """Event-based: invalidar cuando cambian datos"""
        for key in affected_keys:
            cache.cache.pop(key, None)
        print(f"✓ Invalidated {len(affected_keys)} keys due to {event_type}")
    
    @staticmethod
    def tag_based(cache, tags):
        """Tag-based: invalidar por categorías"""
        # Cache entries con tags: {'key': 'user:123', 'tags': ['users', 'profile']}
        # Invalidar todos los entries con tag 'users'
        pass

# ===== DEMOSTRACIÓN =====
print("=== Load Balancing Demo ===\n")

servers = [
    Server('server1', '10.0.0.1', 8080, weight=3),
    Server('server2', '10.0.0.2', 8080, weight=2),
    Server('server3', '10.0.0.3', 8080, weight=1),
]

lb = LoadBalancer(servers)

# Round Robin
print("--- Round Robin ---")
for i in range(6):
    server = lb.get_server_round_robin()
    print(f"Request {i+1}{server.id}")

# Weighted Round Robin
print("\n--- Weighted Round Robin ---")
lb.round_robin_index = 0
for i in range(6):
    server = lb.get_server_weighted_round_robin()
    print(f"Request {i+1}{server.id}")

# IP Hash (sticky sessions)
print("\n--- IP Hash (Sticky Sessions) ---")
clients = ['192.168.1.1', '192.168.1.2', '192.168.1.1', '192.168.1.3', '192.168.1.1']
for i, client_ip in enumerate(clients):
    server = lb.get_server_ip_hash(client_ip)
    print(f"Request {i+1} from {client_ip}{server.id}")

print("\n=== Caching Demo ===\n")

# Setup
database = {
    'user:1': {'name': 'Alice', 'age': 30},
    'user:2': {'name': 'Bob', 'age': 25},
    'user:3': {'name': 'Charlie', 'age': 35}
}

cache = LRUCache(capacity=2)
cache_aside = CacheAside(cache, database)

# Cache-Aside pattern
print("--- Cache-Aside Pattern ---")
cache_aside.get('user:1')  # MISS
cache_aside.get('user:1')  # HIT
cache_aside.get('user:2')  # MISS
cache_aside.get('user:3')  # MISS (evicts user:1)
cache_aside.get('user:1')  # MISS again

print(f"\nCache stats: {cache.get_stats()}")

# Write-Back pattern
print("\n--- Write-Back Pattern ---")
wb_cache = WriteBackCache(LRUCache(10), {})
wb_cache.write('config:theme', 'dark')
wb_cache.write('config:language', 'es')
print("Waiting for flush...")
time.sleep(6)
print(f"DB contents: {wb_cache.database}")

Estrategias de caching por capa:

CapaEjemploTTL típico
CDNCloudflare, AkamaiHoras/Días
ApplicationRedis, MemcachedMinutos/Horas
DatabaseQuery cacheSegundos/Minutos
BrowserLocal storageDías/Semanas

Bases de Datos

Receta 2.6: SQL Avanzado - Optimización

Índices: El arma secreta de performance

-- ===== ÍNDICES =====

-- Tabla de ejemplo
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE,
    name VARCHAR(100),
    age INTEGER,
    city VARCHAR(100),
    created_at TIMESTAMP DEFAULT NOW()
);

-- ❌ SIN ÍNDICE: Full table scan - O(n)
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Seq Scan on users (cost=0.00..1234.00 rows=1)

-- ✅ CON ÍNDICE: Index scan - O(log n)
CREATE INDEX idx_users_email ON users(email);

EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Index Scan using idx_users_email (cost=0.42..8.44 rows=1)

-- ÍNDICES COMPUESTOS (múltiples columnas)
CREATE INDEX idx_users_city_age ON users(city, age);

-- ✅ BIEN: Usa el índice
SELECT * FROM users WHERE city = 'NYC' AND age > 25;

-- ⚠️  PARCIAL: Solo usa parte del índice
SELECT * FROM users WHERE age > 25;  -- No usa índice (age no es primera columna)

-- ❌ MAL: No usa índice
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';

-- ✅ SOLUCIÓN: Índice funcional
CREATE INDEX idx_users_email_lower ON users(LOWER(email));
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';

-- ÍNDICE PARCIAL (solo subset de datos)
CREATE INDEX idx_active_users ON users(city) WHERE created_at > '2024-01-01';

-- Útil para queries que filtran datos recientes
SELECT * FROM users WHERE city = 'LA' AND created_at > '2024-01-01';

-- ÍNDICE COVERING (incluye columnas extra)
CREATE INDEX idx_users_city_covering ON users(city) INCLUDE (name, age);

-- Query satisfecha completamente con el índice (no lee la tabla)
SELECT name, age FROM users WHERE city = 'Boston';

Window Functions: Análisis poderoso

-- ===== WINDOW FUNCTIONS =====

CREATE TABLE sales (
    id SERIAL PRIMARY KEY,
    salesperson VARCHAR(100),
    region VARCHAR(50),
    amount DECIMAL(10, 2),
    sale_date DATE
);

-- ROW_NUMBER: Numerar filas
SELECT 
    salesperson,
    amount,
    ROW_NUMBER() OVER (ORDER BY amount DESC) as rank
FROM sales;

-- RANK con empates
SELECT 
    salesperson,
    amount,
    RANK() OVER (ORDER BY amount DESC) as rank,
    DENSE_RANK() OVER (ORDER BY amount DESC) as dense_rank
FROM sales;

-- PARTITION BY: Agrupar ventanas
SELECT 
    salesperson,
    region,
    amount,
    RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region
FROM sales;

-- Running totals (suma acumulativa)
SELECT 
    sale_date,
    amount,
    SUM(amount) OVER (ORDER BY sale_date) as running_total
FROM sales
ORDER BY sale_date;

-- Moving average (promedio móvil de 7 días)
SELECT 
    sale_date,
    amount,
    AVG(amount) OVER (
        ORDER BY sale_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7days
FROM sales;

-- LAG y LEAD: Acceder a filas anteriores/siguientes
SELECT 
    sale_date,
    amount,
    LAG(amount, 1) OVER (ORDER BY sale_date) as prev_day,
    LEAD(amount, 1) OVER (ORDER BY sale_date) as next_day,
    amount - LAG(amount, 1) OVER (ORDER BY sale_date) as day_over_day_change
FROM sales;

-- NTILE: Dividir en cuartiles/percentiles
SELECT 
    salesperson,
    amount,
    NTILE(4) OVER (ORDER BY amount) as quartile
FROM sales;

Common Table Expressions (CTEs): Queries legibles

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Setup
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
session = Session()

def demonstrate_ctes():
    """CTEs para queries complejas y legibles"""
    
    # ===== CTE SIMPLE =====
    query = text("""
        WITH high_value_customers AS (
            SELECT 
                customer_id,
                SUM(order_total) as lifetime_value
            FROM orders
            GROUP BY customer_id
            HAVING SUM(order_total) > 10000
        )
        SELECT 
            c.name,
            c.email,
            hvc.lifetime_value
        FROM customers c
        JOIN high_value_customers hvc ON c.id = hvc.customer_id
        ORDER BY hvc.lifetime_value DESC;
    """)
    
    # ===== CTE RECURSIVO: Jerarquías =====
    org_chart_query = text("""
        -- Tabla de empleados con manager_id
        WITH RECURSIVE employee_hierarchy AS (
            -- Caso base: CEO (sin manager)
            SELECT 
                id,
                name,
                manager_id,
                1 as level,
                name as path
            FROM employees
            WHERE manager_id IS NULL
            
            UNION ALL
            
            -- Caso recursivo: empleados con managers
            SELECT 
                e.id,
                e.name,
                e.manager_id,
                eh.level + 1,
                eh.path || ' > ' || e.name
            FROM employees e
            JOIN employee_hierarchy eh ON e.manager_id = eh.id
        )
        SELECT 
            level,
            name,
            path
        FROM employee_hierarchy
        ORDER BY level, name;
    """)
    
    # ===== MÚLTIPLES CTEs =====
    complex_query = text("""
        WITH 
        -- CTE 1: Ventas mensuales
        monthly_sales AS (
            SELECT 
                DATE_TRUNC('month', sale_date) as month,
                SUM(amount) as total_sales
            FROM sales
            GROUP BY DATE_TRUNC('month', sale_date)
        ),
        -- CTE 2: Promedio móvil
        sales_with_avg AS (
            SELECT 
                month,
                total_sales,
                AVG(total_sales) OVER (
                    ORDER BY month
                    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
                ) as moving_avg_3months
            FROM monthly_sales
        ),
        -- CTE 3: Growth rate
        sales_with_growth AS (
            SELECT 
                month,
                total_sales,
                moving_avg_3months,
                (total_sales - LAG(total_sales) OVER (ORDER BY month)) / 
                    LAG(total_sales) OVER (ORDER BY month) * 100 as growth_rate
            FROM sales_with_avg
        )
        SELECT * FROM sales_with_growth
        WHERE growth_rate IS NOT NULL
        ORDER BY month DESC;
    """)
    
    results = session.execute(complex_query)
    return results.fetchall()

# ===== OPTIMIZACIÓN DE QUERIES =====
def query_optimization_tips():
    """Tips para optimizar queries"""
    
    # ❌ MAL: N+1 queries
    users = session.execute(text("SELECT * FROM users")).fetchall()
    for user in users:
        # Para cada usuario, query separado (N queries)
        orders = session.execute(
            text("SELECT * FROM orders WHERE user_id = :user_id"),
            {"user_id": user.id}
        ).fetchall()
    
    # ✅ BIEN: JOIN (1 query)
    results = session.execute(text("""
        SELECT 
            u.*,
            o.*
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
    """)).fetchall()
    
    # ❌ MAL: SELECT *
    session.execute(text("SELECT * FROM users"))  # Trae todas las columnas
    
    # ✅ BIEN: SELECT específico
    session.execute(text("SELECT id, email, name FROM users"))
    
    # ❌ MAL: OR en múltiples columnas (no usa índices)
    session.execute(text("""
        SELECT * FROM products 
        WHERE category = 'electronics' OR brand = 'Apple'
    """))
    
    # ✅ BIEN: UNION de queries indexadas
    session.execute(text("""
        SELECT * FROM products WHERE category = 'electronics'
        UNION
        SELECT * FROM products WHERE brand = 'Apple'
    """))
    
    # ✅ MEJOR: Si es AND, usa índice compuesto
    # CREATE INDEX idx_products_category_brand ON products(category, brand)
    session.execute(text("""
        SELECT * FROM products 
        WHERE category = 'electronics' AND brand = 'Apple'
    """))

Transacciones y niveles de aislamiento:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)

@contextmanager
def transaction_scope(isolation_level='READ COMMITTED'):
    """Context manager para transacciones"""
    session = Session()
    session.connection(execution_options={"isolation_level": isolation_level})
    
    try:
        yield session
        session.commit()
    except Exception as e:
        session.rollback()
        raise
    finally:
        session.close()

# ===== ACID PROPERTIES =====

def demonstrate_transactions():
    """Demostrar propiedades ACID"""
    
    # ATOMICITY: Todo o nada
    with transaction_scope() as session:
        session.execute(text("""
            UPDATE accounts SET balance = balance - 100 WHERE id = 1
        """))
        session.execute(text("""
            UPDATE accounts SET balance = balance + 100 WHERE id = 2
        """))
        # Ambas operaciones o ninguna (commit/rollback)
    
    # ISOLATION LEVELS
    
    # 1. READ UNCOMMITTED (dirty reads posibles)
    with transaction_scope('READ UNCOMMITTED') as session:
        # Puede ver cambios no commiteados de otras transacciones
        pass
    
    # 2. READ COMMITTED (default en PostgreSQL)
    with transaction_scope('READ COMMITTED') as session:
        # Solo ve cambios commiteados
        # Pero puede ver diferentes datos en múltiples reads
        pass
    
    # 3. REPEATABLE READ
    with transaction_scope('REPEATABLE READ') as session:
        # Mismos datos en múltiples reads dentro de la transacción
        # Phantom reads posibles (nuevas filas)
        pass
    
    # 4. SERIALIZABLE (más estricto)
    with transaction_scope('SERIALIZABLE') as session:
        # Transacciones ejecutan como si fueran secuenciales
        # Sin phantom reads, pero puede fallar con serialization errors
        pass

# ===== DEADLOCKS Y OPTIMISTIC LOCKING =====

def handle_deadlocks():
    """Manejar deadlocks con retry"""
    from sqlalchemy.exc import OperationalError
    import time
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            with transaction_scope() as session:
                # Operaciones que pueden causar deadlock
                session.execute(text("""
                    UPDATE inventory 
                    SET quantity = quantity - 1 
                    WHERE product_id = :id
                """), {"id": 123})
                
                session.execute(text("""
                    INSERT INTO order_items (product_id, quantity) 
                    VALUES (:id, 1)
                """), {"id": 123})
                
                break  # Éxito
        
        except OperationalError as e:
            if 'deadlock detected' in str(e).lower():
                if attempt < max_retries - 1:
                    print(f"Deadlock detected, retrying... (attempt {attempt + 1})")
                    time.sleep(0.1 * (attempt + 1))  # Exponential backoff
                else:
                    raise

# Optimistic locking con version number
class ProductWithVersion:
    """
    CREATE TABLE products (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        price DECIMAL(10,2),
        version INTEGER DEFAULT 1
    )
    """
    
    @staticmethod
    def update_with_optimistic_lock(product_id, new_price, expected_version):
        with transaction_scope() as session:
            result = session.execute(text("""
                UPDATE products 
                SET price = :new_price, version = version + 1
                WHERE id = :id AND version = :expected_version
                RETURNING version
            """), {
                "new_price": new_price,
                "id": product_id,
                "expected_version": expected_version
            })
            
            if result.rowcount == 0:
                raise Exception("Optimistic lock failed: product was modified")
            
            return result.fetchone()[0]  # Nueva versión

Receta 2.7: NoSQL - Cuándo y cómo usar

Comparación SQL vs NoSQL:

CaracterísticaSQLNoSQL
SchemaRígido (tablas)Flexible (documentos, key-value)
EscalabilidadVerticalHorizontal
TransaccionesACID completoEventual consistency (mayormente)
Joins✅ Eficientes❌ Limitados/evitar
Casos de usoFinanzas, ERPRedes sociales, IoT, logs

MongoDB: Document Store

from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
import json

# Conectar a MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']

# ===== OPERACIONES BÁSICAS =====

def mongodb_crud():
    """CRUD en MongoDB"""
    products = db['products']
    
    # CREATE
    product = {
        'name': 'Laptop Pro',
        'price': 1299.99,
        'specs': {
            'cpu': 'Intel i7',
            'ram': '16GB',
            'storage': '512GB SSD'
        },
        'tags': ['electronics', 'computers'],
        'in_stock': True,
        'created_at': datetime.now()
    }
    result = products.insert_one(product)
    print(f"Inserted ID: {result.inserted_id}")
    
    # READ
    laptop = products.find_one({'name': 'Laptop Pro'})
    print(f"Found: {laptop}")
    
    # UPDATE
    products.update_one(
        {'name': 'Laptop Pro'},
        {'$set': {'price': 1199.99, 'in_stock': False}}
    )
    
    # DELETE
    products.delete_one({'name': 'Laptop Pro'})
    
    # BULK INSERT
    products.insert_many([
        {'name': 'Mouse', 'price': 29.99, 'category': 'accessories'},
        {'name': 'Keyboard', 'price': 79.99, 'category': 'accessories'},
    ])

# ===== QUERIES AVANZADOS =====

def mongodb_queries():
    """Queries complejos en MongoDB"""
    products = db['products']
    
    # Queries con operadores
    expensive = products.find({'price': {'$gt': 100}})  # Mayor que
    electronics = products.find({'tags': {'$in': ['electronics', 'computers']}})
    
    # Text search (requiere índice de texto)
    products.create_index([('name', 'text'), ('description', 'text')])
    search_results = products.find({'$text': {'$search': 'laptop pro'}})
    
    # Aggregation pipeline
    pipeline = [
        # Stage 1: Filtrar
        {'$match': {'in_stock': True}},
        
        # Stage 2: Agrupar
        {'$group': {
            '_id': '$category',
            'total_products': {'$sum': 1},
            'avg_price': {'$avg': '$price'},
            'max_price': {'$max': '$price'}
        }},
        
        # Stage 3: Ordenar
        {'$sort': {'avg_price': -1}},
        
        # Stage 4: Limitar
        {'$limit': 10}
    ]
    
    results = list(products.aggregate(pipeline))
    print(f"Aggregation results: {json.dumps(results, indent=2, default=str)}")
    
    # Lookup (join)
    orders = db['orders']
    pipeline = [
        {'$lookup': {
            'from': 'products',
            'localField': 'product_id',
            'foreignField': '_id',
            'as': 'product_details'
        }},
        {'$unwind': '$product_details'}
    ]
    
    enriched_orders = list(orders.aggregate(pipeline))

# ===== MODELADO DE DATOS =====

def data_modeling_patterns():
    """Patrones de modelado en MongoDB"""
    
    # PATRÓN 1: Embedding (desnormalización)
    # Usar cuando: Relación 1-a-pocos, datos se leen juntos
    user_with_address = {
        '_id': 'user_123',
        'name': 'Alice',
        'email': 'alice@example.com',
        'address': {  # Embedded document
            'street': '123 Main St',
            'city': 'NYC',
            'zip': '10001'
        },
        'recent_orders': [  # Embedded array
            {'order_id': 'ord_1', 'total': 99.99},
            {'order_id': 'ord_2', 'total': 149.99}
        ]
    }
    
    # PATRÓN 2: Referencing (normalización)
    # Usar cuando: Relación 1-a-muchos, datos se actualizan independientemente
    user = {
        '_id': 'user_123',
        'name': 'Alice',
        'email': 'alice@example.com'
    }
    
    orders = [
        {'_id': 'ord_1', 'user_id': 'user_123', 'total': 99.99},
        {'_id': 'ord_2', 'user_id': 'user_123', 'total': 149.99}
    ]
    
    # PATRÓN 3: Hybrid (combinación)
    # Denormalizar datos críticos, referenciar el resto
    blog_post = {
        '_id': 'post_456',
        'title': 'MongoDB Best Practices',
        'author': {  # Denormalizar datos básicos del autor
            'id': 'user_123',
            'name': 'Alice',
            'avatar': 'url'
        },
        'comments_count': 42,  # Denormalizar conteo
        'comment_ids': ['cmt_1', 'cmt_2']  # Referenciar comments completos
    }
    
    # PATRÓN 4: Bucketing (para time-series data)
    # Agrupar múltiples eventos en un documento
    sensor_readings_bucket = {
        '_id': 'sensor_123_2024-01-15_10',
        'sensor_id': 'sensor_123',
        'date': datetime(2024, 1, 15, 10),
        'readings': [
            {'time': '10:00', 'temp': 22.5, 'humidity': 45},
            {'time': '10:01', 'temp': 22.6, 'humidity': 46},
            # ... hasta 60 readings (1 por minuto)
        ]
    }

# ===== ÍNDICES Y PERFORMANCE =====

def mongodb_indexes():
    """Índices en MongoDB"""
    products = db['products']
    
    # Índice simple
    products.create_index([('price', ASCENDING)])
    
    # Índice compuesto
    products.create_index([
        ('category', ASCENDING),
        ('price', DESCENDING)
    ])
    
    # Índice único
    users = db['users']
    users.create_index([('email', ASCENDING)], unique=True)
    
    # Índice TTL (auto-delete después de N segundos)
    sessions = db['sessions']
    sessions.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
    
    # Índice de texto
    products.create_index([('name', 'text'), ('description', 'text')])
    
    # Ver índices
    print("Indexes:", list(products.list_indexes()))
    
    # Explain para ver uso de índices
    explain = products.find({'price': {'$gt': 100}}).explain()
    print(f"Query plan: {explain['executionStats']}")

Redis: In-Memory Key-Value Store

import redis
import json
from datetime import timedelta

# Conectar a Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ===== OPERACIONES BÁSICAS =====

def redis_basics():
    """Operaciones básicas en Redis"""
    
    # STRING operations
    r.set('user:1000:name', 'Alice')
    r.set('user:1000:email', 'alice@example.com')
    
    name = r.get('user:1000:name')
    print(f"Name: {name}")
    
    # SET con expiración
    r.setex('session:abc123', timedelta(hours=1), 'user_data')
    
    # Incremento atómico
    r.set('page:views', 0)
    r.incr('page:views')
    r.incrby('page:views', 10)
    views = r.get('page:views')
    
    # HASH operations (objects)
    r.hset('user:1000', mapping={
        'name': 'Alice',
        'email': 'alice@example.com',
        'age': '30'
    })
    
    user = r.hgetall('user:1000')
    print(f"User: {user}")
    
    r.hincrby('user:1000', 'age', 1)  # Increment age
    
    # LIST operations (queues)
    r.lpush('queue:emails', 'email1@example.com')  # Push left
    r.rpush('queue:emails', 'email2@example.com')  # Push right
    
    email = r.lpop('queue:emails')  # Pop left (FIFO)
    print(f"Processing: {email}")
    
    # SET operations (unique values)
    r.sadd('tags:post:123', 'python', 'redis', 'nosql')
    r.sadd('tags:post:456', 'python', 'mongodb')
    
    # Intersección (tags comunes)
    common_tags = r.sinter('tags:post:123', 'tags:post:456')
    print(f"Common tags: {common_tags}")
    
    # SORTED SET (leaderboards)
    r.zadd('leaderboard', {
        'Alice': 1500,
        'Bob': 1200,
        'Charlie': 1800
    })
    
    # Top 3
    top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
    print(f"Leaderboard: {top3}")
    
    # Rank de un usuario
    rank = r.zrevrank('leaderboard', 'Alice')
    print(f"Alice's rank: {rank + 1}")

# ===== CASOS DE USO =====

class RateLimiter:
    """Rate limiter con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, user_id: str, max_requests: int = 10, window_seconds: int = 60):
        """
        Sliding window rate limiter
        Permite max_requests en window_seconds
        """
        key = f"rate_limit:{user_id}"
        current_time = int(time.time())
        window_start = current_time - window_seconds
        
        # Limpiar requests antiguos
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # Contar requests en ventana
        request_count = self.redis.zcard(key)
        
        if request_count < max_requests:
            # Agregar request actual
            self.redis.zadd(key, {str(current_time): current_time})
            self.redis.expire(key, window_seconds)
            return True
        
        return False

class DistributedLock:
    """Distributed lock con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def acquire_lock(self, lock_name: str, timeout: int = 10):
        """Adquirir lock"""
        lock_key = f"lock:{lock_name}"
        
        # SET NX (solo si no existe) con expiración
        acquired = self.redis.set(
            lock_key,
            'locked',
            nx=True,
            ex=timeout
        )
        
        return acquired
    
    def release_lock(self, lock_name: str):
        """Liberar lock"""
        self.redis.delete(f"lock:{lock_name}")
    
    def with_lock(self, lock_name: str):
        """Context manager para lock"""
        from contextlib import contextmanager
        
        @contextmanager
        def _lock():
            acquired = self.acquire_lock(lock_name)
            if not acquired:
                raise Exception(f"Could not acquire lock: {lock_name}")
            
            try:
                yield
            finally:
                self.release_lock(lock_name)
        
        return _lock()

class SessionStore:
    """Session storage con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def create_session(self, user_id: str, session_data: dict, ttl: int = 3600):
        """Crear sesión"""
        import uuid
        session_id = str(uuid.uuid4())
        
        self.redis.setex(
            f"session:{session_id}",
            ttl,
            json.dumps({**session_data, 'user_id': user_id})
        )
        
        return session_id
    
    def get_session(self, session_id: str):
        """Obtener sesión"""
        data = self.redis.get(f"session:{session_id}")
        return json.loads(data) if data else None
    
    def extend_session(self, session_id: str, ttl: int = 3600):
        """Extender sesión"""
        self.redis.expire(f"session:{session_id}", ttl)
    
    def delete_session(self, session_id: str):
        """Eliminar sesión"""
        self.redis.delete(f"session:{session_id}")

# Usar los componentes
import time

rate_limiter = RateLimiter(r)
for i in range(15):
    allowed = rate_limiter.is_allowed('user_123', max_requests=10, window_seconds=60)
    print(f"Request {i+1}: {'✓ Allowed' if allowed else '✗ Rate limited'}")

distributed_lock = DistributedLock(r)
with distributed_lock.with_lock('critical_section'):
    print("Doing critical work...")
    time.sleep(1)

session_store = SessionStore(r)
sid = session_store.create_session('user_456', {'theme': 'dark'})
print(f"Session created: {sid}")
session = session_store.get_session(sid)
print(f"Session data: {session}")

Cuándo usar cada tipo de NoSQL:

  1. Document Store (MongoDB, CouchDB)

    • ✅ Esquemas flexibles que evolucionan
    • ✅ Datos jerárquicos (JSON-like)
    • ✅ Content management, catálogos de productos
  2. Key-Value (Redis, DynamoDB)

    • ✅ Caching
    • ✅ Session storage
    • ✅ Real-time analytics
    • ✅ Rate limiting
  3. Column-Family (Cassandra, HBase)

    • ✅ Time-series data
    • ✅ Logs a escala masiva
    • ✅ IoT sensor data
  4. Graph (Neo4j, ArangoDB)

    • ✅ Social networks
    • ✅ Recommendation engines
    • ✅ Fraud detection
    • ✅ Knowledge graphs

APIs y Comunicación entre Servicios

Receta 2.8: REST vs GraphQL vs gRPC

Comparación:

CaracterísticaRESTGraphQLgRPC
ProtocoloHTTP/JSONHTTP/JSONHTTP/2+Protobuf
QueriesMúltiples endpointsSingle endpointRPC calls
Over/Under-fetching❌ Común✅ Resuelve✅ Evita
Performance🟡 Media🟡 Media🟢 Alta
Tipado❌ Débil✅ Strong✅ Strong
Streaming❌ Difícil⚠️ Subscriptions✅ Bidireccional
Caching✅ HTTP cache❌ Complejo❌ Custom
Learning curve🟢 Fácil🟡 Media🔴 Alta

REST API completo:

from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# ===== AUTHENTICATION & AUTHORIZATION =====

def token_required(f):
    """Decorator para proteger endpoints"""
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': 'Token missing'}), 401
        
        try:
            # Bearer eyJhbGc...
            token = token.split(' ')[1]
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            current_user = data['user_id']
        except:
            return jsonify({'error': 'Token invalid'}), 401
        
        return f(current_user, *args, **kwargs)
    
    return decorated

@app.route('/auth/login', methods=['POST'])
def login():
    """Login y generar JWT"""
    data = request.json
    
    # Validar credenciales (simplificado)
    if data.get('email') == 'user@example.com' and data.get('password') == 'password':
        token = jwt.encode({
            'user_id': 'user_123',
            'exp': datetime.utcnow() + timedelta(hours=24)
        }, app.config['SECRET_KEY'])
        
        return jsonify({'token': token}), 200
    
    return jsonify({'error': 'Invalid credentials'}), 401

# ===== RESOURCE ENDPOINTS =====

# Simulated database
products = {
    '1': {'id': '1', 'name': 'Laptop', 'price': 999.99, 'stock': 10},
    '2': {'id': '2', 'name': 'Mouse', 'price': 29.99, 'stock': 50}
}

@app.route('/api/v1/products', methods=['GET'])
def get_products():
    """
    GET /api/v1/products?page=1&limit=10&sort=price&order=desc&category=electronics
    
    Query params:
    - page: pagination
    - limit: items per page
    - sort: field to sort by
    - order: asc or desc
    - filters: category, price_min, price_max, etc.
    """
    # Pagination
    page = int(request.args.get('page', 1))
    limit = int(request.args.get('limit', 10))
    
    # Filtering
    category = request.args.get('category')
    price_min = request.args.get('price_min', type=float)
    price_max = request.args.get('price_max', type=float)
    
    # Sorting
    sort_by = request.args.get('sort', 'id')
    order = request.args.get('order', 'asc')
    
    # Apply filters
    filtered_products = list(products.values())
    
    if price_min:
        filtered_products = [p for p in filtered_products if p['price'] >= price_min]
    if price_max:
        filtered_products = [p for p in filtered_products if p['price'] <= price_max]
    
    # Apply sorting
    filtered_products.sort(
        key=lambda x: x.get(sort_by),
        reverse=(order == 'desc')
    )
    
    # Pagination
    start = (page - 1) * limit
    end = start + limit
    paginated = filtered_products[start:end]
    
    # Response with metadata
    return jsonify({
        'data': paginated,
        'metadata': {
            'page': page,
            'limit': limit,
            'total': len(filtered_products),
            'total_pages': (len(filtered_products) + limit - 1) // limit
        },
        'links': {
            'self': f'/api/v1/products?page={page}&limit={limit}',
            'next': f'/api/v1/products?page={page+1}&limit={limit}' if end < len(filtered_products) else None,
            'prev': f'/api/v1/products?page={page-1}&limit={limit}' if page > 1 else None
        }
    }), 200

@app.route('/api/v1/products/<product_id>', methods=['GET'])
def get_product(product_id):
    """GET individual product"""
    product = products.get(product_id)
    
    if not product:
        return jsonify({'error': 'Product not found'}), 404
    
    return jsonify(product), 200

@app.route('/api/v1/products', methods=['POST'])
@token_required
def create_product(current_user):
    """
    POST /api/v1/products
    
    Body:
    {
        "name": "Keyboard",
        "price": 79.99,
        "stock": 30
    }
    """
    data = request.json
    
    # Validation
    if not data.get('name') or not data.get('price'):
        return jsonify({'error': 'Missing required fields'}), 400
    
    # Create product
    product_id = str(len(products) + 1)
    product = {
        'id': product_id,
        'name': data['name'],
        'price': data['price'],
        'stock': data.get('stock', 0)
    }
    products[product_id] = product
    
    # Return 201 Created with Location header
    return jsonify(product), 201, {'Location': f'/api/v1/products/{product_id}'}

@app.route('/api/v1/products/<product_id>', methods=['PUT'])
@token_required
def update_product(current_user, product_id):
    """PUT (replace entire resource)"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    data = request.json
    products[product_id] = {
        'id': product_id,
        'name': data['name'],
        'price': data['price'],
        'stock': data.get('stock', 0)
    }
    
    return jsonify(products[product_id]), 200

@app.route('/api/v1/products/<product_id>', methods=['PATCH'])
@token_required
def patch_product(current_user, product_id):
    """PATCH (partial update)"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    data = request.json
    product = products[product_id]
    
    # Update only provided fields
    if 'name' in data:
        product['name'] = data['name']
    if 'price' in data:
        product['price'] = data['price']
    if 'stock' in data:
        product['stock'] = data['stock']
    
    return jsonify(product), 200

@app.route('/api/v1/products/<product_id>', methods=['DELETE'])
@token_required
def delete_product(current_user, product_id):
    """DELETE product"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    del products[product_id]
    return '', 204  # No content

# ===== ERROR HANDLING =====

@app.errorhandler(404)
def not_found(error):
    return jsonify({
        'error': 'Not Found',
        'message': 'The requested resource was not found'
    }), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({
        'error': 'Internal Server Error',
        'message': 'An unexpected error occurred'
    }), 500

# ===== RATE LIMITING =====

from functools import wraps
from flask import g
import time

def rate_limit(max_requests=10, window=60):
    """Simple in-memory rate limiter"""
    requests = {}
    
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            client_ip = request.remote_addr
            current_time = time.time()
            
            # Clean old requests
            if client_ip in requests:
                requests[client_ip] = [
                    t for t in requests[client_ip]
                    if current_time - t < window
                ]
            else:
                requests[client_ip] = []
            
            # Check limit
            if len(requests[client_ip]) >= max_requests:
                return jsonify({
                    'error': 'Rate limit exceeded',
                    'retry_after': window
                }), 429
            
            # Add request
            requests[client_ip].append(current_time)
            
            # Set headers
            response = f(*args, **kwargs)
            if isinstance(response, tuple):
                resp, status = response[0], response[1]
            else:
                resp, status = response, 200
            
            # Return with rate limit headers
            return resp, status, {
                'X-RateLimit-Limit': max_requests,
                'X-RateLimit-Remaining': max_requests - len(requests[client_ip]),
                'X-RateLimit-Reset': int(current_time + window)
            }
        
        return decorated
    return decorator

@app.route('/api/v1/limited', methods=['GET'])
@rate_limit(max_requests=5, window=60)
def limited_endpoint():
    return jsonify({'message': 'This endpoint is rate limited'}), 200

GraphQL API:

import graphene
from graphene import ObjectType, String, Float, Int, List, Field, Schema

# ===== TYPES =====

class ProductType(ObjectType):
    id = String()
    name = String()
    price = Float()
    stock = Int()

class OrderItemType(ObjectType):
    product = Field(ProductType)
    quantity = Int()
    subtotal = Float()

class OrderType(ObjectType):
    id = String()
    user_id = String()
    items = List(OrderItemType)
    total = Float()
    status = String()

# ===== QUERIES =====

class Query(ObjectType):
    # Single product
    product = Field(
        ProductType,
        id=String(required=True),
        description="Get product by ID"
    )
    
    # List products with filtering
    products = List(
        ProductType,
        category=String(),
        price_min=Float(),
        price_max=Float(),
        limit=Int(default_value=10)
    )
    
    # Order with nested products
    order = Field(
        OrderType,
        id=String(required=True)
    )
    
    def resolve_product(self, info, id):
        """Resolver para single product"""
        return products.get(id)
    
    def resolve_products(self, info, category=None, price_min=None, price_max=None, limit=10):
        """Resolver para list products"""
        result = list(products.values())
        
        if price_min:
            result = [p for p in result if p['price'] >= price_min]
        if price_max:
            result = [p for p in result if p['price'] <= price_max]
        
        return result[:limit]
    
    def resolve_order(self, info, id):
        """Resolver para order (with N+1 problem solution)"""
        # En producción: usar DataLoader para batching
        order = orders.get(id)
        if not order:
            return None
        
        # Cargar productos de los items
        enriched_items = []
        for item in order['items']:
            product = products.get(item['product_id'])
            enriched_items.append({
                'product': product,
                'quantity': item['quantity'],
                'subtotal': product['price'] * item['quantity']
            })
        
        return {
            **order,
            'items': enriched_items
        }

# ===== MUTATIONS =====

class CreateProduct(graphene.Mutation):
    class Arguments:
        name = String(required=True)
        price = Float(required=True)
        stock = Int(default_value=0)
    
    product = Field(ProductType)
    
    def mutate(self, info, name, price, stock=0):
        product_id = str(len(products) + 1)
        product = {
            'id': product_id,
            'name': name,
            'price': price,
            'stock': stock
        }
        products[product_id] = product
        return CreateProduct(product=product)

class UpdateProduct(graphene.Mutation):
    class Arguments:
        id = String(required=True)
        name = String()
        price = Float()
        stock = Int()
    
    product = Field(ProductType)
    
    def mutate(self, info, id, name=None, price=None, stock=None):
        if id not in products:
            raise Exception('Product not found')
        
        product = products[id]
        if name:
            product['name'] = name
        if price:
            product['price'] = price
        if stock is not None:
            product['stock'] = stock
        
        return UpdateProduct(product=product)

class Mutation(ObjectType):
    create_product = CreateProduct.Field()
    update_product = UpdateProduct.Field()

# ===== SCHEMA =====

schema = Schema(query=Query, mutation=Mutation)

# ===== FLASK INTEGRATION =====

from flask import Flask
from flask_graphql import GraphQLView

app = Flask(__name__)
app.add_url_rule(
    '/graphql',
    view_func=GraphQLView.as_view(
        'graphql',
        schema=schema,
        graphiql=True  # GraphiQL IDE
    )
)

# Ejemplo de query GraphQL:
"""
query GetProducts {
  products(priceMin: 20, priceMax: 100, limit: 5) {
    id
    name
    price
    stock
  }
}

query GetOrder {
  order(id: "1") {
    id
    total
    items {
      product {
        name
        price
      }
      quantity
      subtotal
    }
  }
}

mutation CreateProduct {
  createProduct(name: "Monitor", price: 299.99, stock: 15) {
    product {
      id
      name
      price
    }
  }
}
"""

gRPC con Protocol Buffers:

// products.proto
syntax = "proto3";

package products;

service ProductService {
  rpc GetProduct (ProductRequest) returns (Product);
  rpc ListProducts (ListProductsRequest) returns (ProductList);
  rpc CreateProduct (CreateProductRequest) returns (Product);
  rpc StreamOrders (StreamOrdersRequest) returns (stream Order);
}

message Product {
  string id = 1;
  string name = 2;
  double price = 3;
  int32 stock = 4;
}

message ProductRequest {
  string id = 1;
}

message ListProductsRequest {
  int32 page = 1;
  int32 limit = 2;
  double price_min = 3;
  double price_max = 4;
}

message ProductList {
  repeated Product products = 1;
  int32 total = 2;
}

message CreateProductRequest {
  string name = 1;
  double price = 2;
  int32 stock = 3;
}

message Order {
  string id = 1;
  string user_id = 2;
  double total = 3;
  string status = 4;
}

message StreamOrdersRequest {
  string user_id = 1;
}
# Generar código Python: python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. products.proto

import grpc
from concurrent import futures
import products_pb2
import products_pb2_grpc
import time

# ===== SERVER =====

class ProductServicer(products_pb2_grpc.ProductServiceServicer):
    """Implementación del servicio gRPC"""
    
    def GetProduct(self, request, context):
        """Unary RPC: 1 request → 1 response"""
        product_id = request.id
        
        if product_id not in products:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details('Product not found')
            return products_pb2.Product()
        
        p = products[product_id]
        return products_pb2.Product(
            id=p['id'],
            name=p['name'],
            price=p['price'],
            stock=p['stock']
        )
    
    def ListProducts(self, request, context):
        """Unary RPC con filtros"""
        filtered = list(products.values())
        
        if request.price_min > 0:
            filtered = [p for p in filtered if p['price'] >= request.price_min]
        if request.price_max > 0:
            filtered = [p for p in filtered if p['price'] <= request.price_max]
        
        # Pagination
        start = (request.page - 1) * request.limit
        end = start + request.limit
        paginated = filtered[start:end]
        
        product_messages = [
            products_pb2.Product(
                id=p['id'],
                name=p['name'],
                price=p['price'],
                stock=p['stock']
            )
            for p in paginated
        ]
        
        return products_pb2.ProductList(
            products=product_messages,
            total=len(filtered)
        )
    
    def CreateProduct(self, request, context):
        """Unary RPC para crear producto"""
        product_id = str(len(products) + 1)
        product = {
            'id': product_id,
            'name': request.name,
            'price': request.price,
            'stock': request.stock
        }
        products[product_id] = product
        
        return products_pb2.Product(
            id=product_id,
            name=request.name,
            price=request.price,
            stock=request.stock
        )
    
    def StreamOrders(self, request, context):
        """Server-streaming RPC: 1 request → N responses"""
        user_id = request.user_id
        
        # Simular stream de órdenes
        for i in range(5):
            time.sleep(1)  # Simular delay
            yield products_pb2.Order(
                id=f"order_{i}",
                user_id=user_id,
                total=99.99 * (i + 1),
                status='processing'
            )

def serve():
    """Iniciar servidor gRPC"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    products_pb2_grpc.add_ProductServiceServicer_to_server(
        ProductServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

# ===== CLIENT =====

def run_client():
    """Cliente gRPC"""
    channel = grpc.insecure_channel('localhost:50051')
    stub = products_pb2_grpc.ProductServiceStub(channel)
    
    # GetProduct
    response = stub.GetProduct(products_pb2.ProductRequest(id='1'))
    print(f"Product: {response.name}, ${response.price}")
    
    # ListProducts
    response = stub.ListProducts(products_pb2.ListProductsRequest(
        page=1,
        limit=10,
        price_min=20.0,
        price_max=100.0
    ))
    print(f"Found {response.total} products:")
    for product in response.products:
        print(f"  - {product.name}: ${product.price}")
    
    # CreateProduct
    new_product = stub.CreateProduct(products_pb2.CreateProductRequest(
        name='Monitor',
        price=299.99,
        stock=15
    ))
    print(f"Created product: {new_product.id}")
    
    # StreamOrders (server streaming)
    print("\nStreaming orders:")
    for order in stub.StreamOrders(products_pb2.StreamOrdersRequest(user_id='user_123')):
        print(f"  Order {order.id}: ${order.total} - {order.status}")

if __name__ == '__main__':
    # Run server in background, then client
    import threading
    server_thread = threading.Thread(target=serve, daemon=True)
    server_thread.start()
    time.sleep(2)
    run_client()

Cuándo usar cada uno:

REST:

GraphQL:

gRPC:


Receta 2.9: Resiliencia - Circuit Breakers y Retry

Circuit Breaker Pattern:

from enum import Enum
from datetime import datetime, timedelta
import time
import random

class CircuitState(Enum):
    CLOSED = "closed"      # Normal: requests pasan
    OPEN = "open"          # Fallando: rechazar requests
    HALF_OPEN = "half_open"  # Testing: permitir algunos requests

class CircuitBreaker:
    """
    Circuit Breaker para proteger contra fallos en cascada
    
    Estados:
    - CLOSED: Funcionando normal
    - OPEN: Demasiados fallos, rechazar requests
    - HALF_OPEN: Intentar recuperación gradual
    """
    
    def __init__(
        self,
        failure_threshold=5,     # Fallos antes de abrir
        timeout=60,              # Segundos en OPEN
        success_threshold=2      # Éxitos para cerrar desde HALF_OPEN
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        """Ejecutar función con circuit breaker"""
        
        # Si está OPEN, verificar si es tiempo de intentar HALF_OPEN
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                print("🔄 Circuit breaker: OPEN → HALF_OPEN (attempting recovery)")
            else:
                raise Exception(f"Circuit breaker is OPEN (retry in {self._time_until_retry()}s)")
        
        try:
            # Intentar ejecutar función
            result = func(*args, **kwargs)
            self._on_success()
            return result
        
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """Manejar llamada exitosa"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            print(f"✓ Success in HALF_OPEN ({self.success_count}/{self.success_threshold})")
            
            if self.success_count >= self.success_threshold:
                self._reset()
        else:
            # En CLOSED, reset failure count
            self.failure_count = 0
    
    def _on_failure(self):
        """Manejar llamada fallida"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        print(f"✗ Failure ({self.failure_count}/{self.failure_threshold})")
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"⚠️  Circuit breaker: CLOSED → OPEN (too many failures)")
        
        # Si falla en HALF_OPEN, volver a OPEN
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.success_count = 0
            print("⚠️  Circuit breaker: HALF_OPEN → OPEN (failure during recovery)")
    
    def _reset(self):
        """Resetear circuit breaker"""
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        print("✓ Circuit breaker: HALF_OPEN → CLOSED (recovered)")
    
    def _should_attempt_reset(self):
        """Verificar si es tiempo de intentar recovery"""
        if not self.last_failure_time:
            return True
        
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.timeout
    
    def _time_until_retry(self):
        """Tiempo restante hasta intentar recovery"""
        if not self.last_failure_time:
            return 0
        
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return max(0, self.timeout - elapsed)

# ===== RETRY CON EXPONENTIAL BACKOFF =====

def retry_with_backoff(
    func,
    max_retries=3,
    initial_delay=1,
    max_delay=60,
    backoff_factor=2,
    exceptions=(Exception,)
):
    """
    Retry con exponential backoff y jitter
    
    Delays: 1s, 2s, 4s, 8s, ...
    Jitter: Agregar randomness para evitar thundering herd
    """
    delay = initial_delay
    
    for attempt in range(max_retries + 1):
        try:
            return func()
        
        except exceptions as e:
            if attempt == max_retries:
                print(f"❌ Failed after {max_retries + 1} attempts")
                raise
            
            # Calcular delay con exponential backoff + jitter
            delay = min(delay * backoff_factor, max_delay)
            jitter = random.uniform(0, delay * 0.1)  # 10% jitter
            sleep_time = delay + jitter
            
            print(f"⚠️  Attempt {attempt + 1} failed: {e}")
            print(f"   Retrying in {sleep_time:.2f}s...")
            time.sleep(sleep_time)

# ===== DEMOSTRACIÓN =====

def unreliable_service():
    """Simular servicio que falla aleatoriamente"""
    if random.random() < 0.7:  # 70% chance de fallo
        raise Exception("Service unavailable")
    return "Success!"

print("=== Circuit Breaker Demo ===\n")

cb = CircuitBreaker(
    failure_threshold=3,
    timeout=5,
    success_threshold=2
)

# Hacer múltiples llamadas
for i in range(20):
    try:
        result = cb.call(unreliable_service)
        print(f"✓ Request {i+1}: {result}\n")
    except Exception as e:
        print(f"✗ Request {i+1}: {e}\n")
    
    time.sleep(1)

print("\n=== Retry with Backoff Demo ===\n")

attempt_count = [0]

def flaky_service():
    """Servicio que falla las primeras 2 veces"""
    attempt_count[0] += 1
    if attempt_count[0] < 3:
        raise Exception(f"Temporary failure (attempt {attempt_count[0]})")
    return "Success after retries!"

try:
    result = retry_with_backoff(
        flaky_service,
        max_retries=5,
        initial_delay=0.5,
        backoff_factor=2
    )
    print(f"\n✓ Final result: {result}")
except Exception as e:
    print(f"\n❌ Final error: {e}")

Timeout y Fallback:

import signal
from contextlib import contextmanager

class TimeoutError(Exception):
    pass

@contextmanager
def timeout(seconds):
    """Context manager para timeout"""
    def timeout_handler(signum, frame):
        raise TimeoutError(f"Operation timed out after {seconds}s")
    
    # Set signal handler
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

def call_with_fallback(primary_func, fallback_func, timeout_seconds=5):
    """Llamar función con fallback si falla o timeout"""
    try:
        with timeout(timeout_seconds):
            return primary_func()
    except (Exception, TimeoutError) as e:
        print(f"⚠️  Primary failed: {e}, using fallback")
        return fallback_func()

# Uso
def slow_external_api():
    time.sleep(10)  # Muy lento
    return {"data": "from API"}

def cache_fallback():
    return {"data": "from cache (stale)"}

result = call_with_fallback(
    slow_external_api,
    cache_fallback,
    timeout_seconds=2
)
print(f"Result: {result}")

¡Felicidades! 🎉

Has completado la FASE 2: Arquitectura y Diseño de Sistemas del roadmap.

Lo que has aprendido:

✅ Monolitos vs Microservicios
✅ Event-Driven Architecture y CQRS
✅ CAP Theorem y consistencia eventual
✅ Load Balancing y caching strategies
✅ SQL avanzado y optimización
✅ NoSQL (MongoDB, Redis)
✅ REST vs GraphQL vs gRPC
✅ Resiliencia con Circuit Breakers

Próximos pasos:

FASE 3: Infraestructura y DevOps


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo