🏗️ FASE 2: Arquitectura y Diseño de Sistemas

Recetas prácticas para diseñar sistemas escalables y robustos


📚 Tabla de Contenidos

  1. Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?
  2. Receta 2.2: Event-Driven Architecture
  3. Receta 2.3: CQRS y Event Sourcing
  4. Receta 2.4: CAP Theorem y Consistencia Eventual
  5. Receta 2.5: Load Balancing, Caching y Queues

Diseño de Software Escalable

Receta 2.1: Monolito vs Microservicios - ¿Cuándo usar cada uno?

¿Qué son?

Comparación:

CaracterísticaMonolitoMicroservicios
Complejidad inicial🟢 Baja🔴 Alta
DeploymentSimple (1 deploy)Complejo (N deploys)
EscalabilidadVertical solamenteHorizontal por servicio
Testing🟢 Fácil E2E🔴 Difícil E2E
Desarrollo inicial🟢 Rápido🔴 Lento
GestiónSimpleRequiere DevOps/SRE
Debugging🟢 Fácil🔴 Trazas distribuidas
TecnologíaStack únicoPoliglota

Caso de uso: eCommerce

# ===== MONOLITO =====
from flask import Flask, request, jsonify
from datetime import datetime
import sqlite3

app = Flask(__name__)

class MonolithEcommerce:
    """Todo en una aplicación"""
    
    def __init__(self):
        self.db = sqlite3.connect('ecommerce.db', check_same_thread=False)
        self._init_db()
    
    def _init_db(self):
        """Todas las tablas en una DB"""
        cursor = self.db.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                email TEXT,
                name TEXT
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY,
                name TEXT,
                price REAL,
                stock INTEGER
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY,
                user_id INTEGER,
                total REAL,
                status TEXT,
                created_at TEXT
            )
        ''')
        self.db.commit()
    
    def create_order(self, user_id, product_ids):
        """Todo el flujo de orden en un lugar"""
        cursor = self.db.cursor()
        
        # 1. Validar usuario
        cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
        user = cursor.fetchone()
        if not user:
            return {'error': 'User not found'}, 404
        
        # 2. Calcular total y validar stock
        total = 0
        for product_id in product_ids:
            cursor.execute('SELECT price, stock FROM products WHERE id = ?', (product_id,))
            product = cursor.fetchone()
            if not product or product[1] < 1:
                return {'error': f'Product {product_id} unavailable'}, 400
            total += product[0]
        
        # 3. Crear orden
        cursor.execute('''
            INSERT INTO orders (user_id, total, status, created_at)
            VALUES (?, ?, ?, ?)
        ''', (user_id, total, 'pending', datetime.now().isoformat()))
        
        # 4. Reducir stock
        for product_id in product_ids:
            cursor.execute('''
                UPDATE products SET stock = stock - 1 WHERE id = ?
            ''', (product_id,))
        
        # 5. Enviar email (simulado)
        self._send_order_email(user[1], total)
        
        self.db.commit()
        return {'order_id': cursor.lastrowid, 'total': total}, 201
    
    def _send_order_email(self, email, total):
        """Email integrado en el monolito"""
        print(f"📧 Sending order confirmation to {email} for ${total}")

monolith = MonolithEcommerce()

@app.route('/orders', methods=['POST'])
def create_order():
    data = request.json
    result, status = monolith.create_order(
        data['user_id'],
        data['product_ids']
    )
    return jsonify(result), status

# ===== MICROSERVICIOS =====
import requests
from flask import Flask

# Servicio 1: Users
user_service = Flask('users')
users_db = {}

@user_service.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = users_db.get(user_id)
    if not user:
        return jsonify({'error': 'Not found'}), 404
    return jsonify(user), 200

# Servicio 2: Products/Inventory
product_service = Flask('products')
products_db = {
    1: {'name': 'Laptop', 'price': 999.99, 'stock': 10},
    2: {'name': 'Mouse', 'price': 29.99, 'stock': 50}
}

@product_service.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
    product = products_db.get(product_id)
    if not product:
        return jsonify({'error': 'Not found'}), 404
    return jsonify(product), 200

@product_service.route('/products/<int:product_id>/reserve', methods=['POST'])
def reserve_stock(product_id):
    """Reservar stock (inventory management)"""
    product = products_db.get(product_id)
    if not product or product['stock'] < 1:
        return jsonify({'error': 'Out of stock'}), 400
    
    product['stock'] -= 1
    return jsonify({'reserved': True}), 200

# Servicio 3: Orders (Orquestador)
order_service = Flask('orders')
orders_db = {}

class OrderOrchestrator:
    """Coordina llamadas entre microservicios"""
    
    USER_SERVICE_URL = "http://localhost:5001"
    PRODUCT_SERVICE_URL = "http://localhost:5002"
    NOTIFICATION_SERVICE_URL = "http://localhost:5003"
    
    def create_order(self, user_id, product_ids):
        """Flujo distribuido con compensaciones"""
        try:
            # 1. Validar usuario (llamada a user-service)
            user_response = requests.get(
                f"{self.USER_SERVICE_URL}/users/{user_id}",
                timeout=2
            )
            if user_response.status_code != 200:
                return {'error': 'Invalid user'}, 400
            
            user = user_response.json()
            
            # 2. Reservar productos (llamadas a product-service)
            total = 0
            reserved_products = []
            
            for product_id in product_ids:
                # Obtener producto
                product_response = requests.get(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}",
                    timeout=2
                )
                if product_response.status_code != 200:
                    # Compensar: liberar productos ya reservados
                    self._rollback_reservations(reserved_products)
                    return {'error': f'Product {product_id} not found'}, 400
                
                product = product_response.json()
                total += product['price']
                
                # Reservar stock
                reserve_response = requests.post(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/reserve",
                    timeout=2
                )
                if reserve_response.status_code != 200:
                    # Compensar
                    self._rollback_reservations(reserved_products)
                    return {'error': f'Product {product_id} out of stock'}, 400
                
                reserved_products.append(product_id)
            
            # 3. Crear orden
            order_id = len(orders_db) + 1
            orders_db[order_id] = {
                'id': order_id,
                'user_id': user_id,
                'products': product_ids,
                'total': total,
                'status': 'confirmed',
                'created_at': datetime.now().isoformat()
            }
            
            # 4. Notificar (async, fire-and-forget)
            try:
                requests.post(
                    f"{self.NOTIFICATION_SERVICE_URL}/send",
                    json={
                        'email': user['email'],
                        'template': 'order_confirmation',
                        'data': {'order_id': order_id, 'total': total}
                    },
                    timeout=1
                )
            except:
                # No fallar la orden si falla el email
                print("⚠️  Notification failed, but order created")
            
            return {'order_id': order_id, 'total': total}, 201
        
        except requests.Timeout:
            return {'error': 'Service timeout'}, 504
        except requests.ConnectionError:
            return {'error': 'Service unavailable'}, 503
    
    def _rollback_reservations(self, product_ids):
        """Saga pattern: Compensar transacciones"""
        for product_id in product_ids:
            try:
                requests.post(
                    f"{self.PRODUCT_SERVICE_URL}/products/{product_id}/release",
                    timeout=1
                )
            except:
                print(f"⚠️  Failed to rollback product {product_id}")

orchestrator = OrderOrchestrator()

@order_service.route('/orders', methods=['POST'])
def create_order_micro():
    data = request.json
    result, status = orchestrator.create_order(
        data['user_id'],
        data['product_ids']
    )
    return jsonify(result), status

# Servicio 4: Notifications (async)
notification_service = Flask('notifications')

@notification_service.route('/send', methods=['POST'])
def send_notification():
    """Servicio independiente de notificaciones"""
    data = request.json
    print(f"📧 Sending {data['template']} to {data['email']}")
    # Aquí iría integración con SendGrid, SES, etc.
    return jsonify({'sent': True}), 200

Cuándo usar Monolito:

Cuándo usar Microservicios:

⚠️ La regla de oro:

“Start with a monolith, evolve to microservices when pain points are clear”
— Martin Fowler


Receta 2.2: Event-Driven Architecture

¿Qué es? Arquitectura donde componentes se comunican mediante eventos (mensajes) en lugar de llamadas síncronas directas.

Ventajas:

Componentes clave:

from dataclasses import dataclass
from datetime import datetime
from typing import List, Callable
import json
from queue import Queue
import threading

# ===== Event Bus =====
@dataclass
class Event:
    """Evento inmutable"""
    event_type: str
    aggregate_id: str
    data: dict
    timestamp: datetime
    metadata: dict = None
    
    def to_json(self):
        return {
            'event_type': self.event_type,
            'aggregate_id': self.aggregate_id,
            'data': self.data,
            'timestamp': self.timestamp.isoformat(),
            'metadata': self.metadata or {}
        }

class EventBus:
    """Bus de eventos in-memory (en producción: Kafka, RabbitMQ, SQS)"""
    
    def __init__(self):
        self.subscribers = {}  # {event_type: [handlers]}
        self.event_log = []    # Store de todos los eventos
    
    def publish(self, event: Event):
        """Publicar evento"""
        print(f"📢 Published: {event.event_type}")
        self.event_log.append(event)
        
        # Notificar a suscriptores
        handlers = self.subscribers.get(event.event_type, [])
        for handler in handlers:
            try:
                # En producción: esto iría a una cola (async)
                threading.Thread(target=handler, args=(event,)).start()
            except Exception as e:
                print(f"❌ Handler error: {e}")
    
    def subscribe(self, event_type: str, handler: Callable):
        """Suscribirse a tipo de evento"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        print(f"✓ Subscribed to {event_type}")
    
    def replay_events(self, aggregate_id: str):
        """Reconstruir estado desde eventos (Event Sourcing)"""
        return [e for e in self.event_log if e.aggregate_id == aggregate_id]

# ===== Ejemplo: Sistema de pedidos =====
event_bus = EventBus()

# Servicio 1: Order Service (publica eventos)
class OrderService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.orders = {}
    
    def create_order(self, order_id: str, user_id: str, items: List[dict]):
        """Crear orden y publicar evento"""
        order = {
            'id': order_id,
            'user_id': user_id,
            'items': items,
            'status': 'created',
            'total': sum(item['price'] * item['qty'] for item in items)
        }
        self.orders[order_id] = order
        
        # Publicar evento (no llamar otros servicios directamente)
        self.event_bus.publish(Event(
            event_type='OrderCreated',
            aggregate_id=order_id,
            data=order,
            timestamp=datetime.now(),
            metadata={'user_id': user_id}
        ))
        
        return order
    
    def confirm_payment(self, order_id: str):
        """Confirmar pago"""
        order = self.orders[order_id]
        order['status'] = 'paid'
        
        self.event_bus.publish(Event(
            event_type='OrderPaid',
            aggregate_id=order_id,
            data={'order_id': order_id, 'total': order['total']},
            timestamp=datetime.now()
        ))

# Servicio 2: Inventory Service (reacciona a eventos)
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.inventory = {
            'laptop': 10,
            'mouse': 50
        }
        
        # Suscribirse a eventos relevantes
        event_bus.subscribe('OrderCreated', self.handle_order_created)
    
    def handle_order_created(self, event: Event):
        """Reservar stock cuando se crea orden"""
        print(f"\n🏪 InventoryService processing {event.event_type}")
        
        items = event.data['items']
        order_id = event.aggregate_id
        
        # Validar y reservar stock
        for item in items:
            product = item['product']
            qty = item['qty']
            
            if self.inventory.get(product, 0) < qty:
                # Publicar evento de fallo
                event_bus.publish(Event(
                    event_type='InventoryReservationFailed',
                    aggregate_id=order_id,
                    data={'product': product, 'requested': qty},
                    timestamp=datetime.now()
                ))
                return
            
            self.inventory[product] -= qty
        
        # Éxito: publicar evento
        event_bus.publish(Event(
            event_type='InventoryReserved',
            aggregate_id=order_id,
            data={'items': items},
            timestamp=datetime.now()
        ))

# Servicio 3: Payment Service
class PaymentService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe('InventoryReserved', self.handle_inventory_reserved)
    
    def handle_inventory_reserved(self, event: Event):
        """Procesar pago cuando stock está reservado"""
        print(f"\n💳 PaymentService processing {event.event_type}")
        
        order_id = event.aggregate_id
        
        # Simular procesamiento de pago
        success = True  # En realidad llamaría a Stripe, etc.
        
        if success:
            event_bus.publish(Event(
                event_type='PaymentSucceeded',
                aggregate_id=order_id,
                data={'payment_id': 'pay_123'},
                timestamp=datetime.now()
            ))
        else:
            event_bus.publish(Event(
                event_type='PaymentFailed',
                aggregate_id=order_id,
                data={'reason': 'Insufficient funds'},
                timestamp=datetime.now()
            ))

# Servicio 4: Notification Service
class NotificationService:
    def __init__(self, event_bus: EventBus):
        event_bus.subscribe('OrderCreated', self.send_order_confirmation)
        event_bus.subscribe('PaymentSucceeded', self.send_payment_confirmation)
        event_bus.subscribe('InventoryReservationFailed', self.send_failure_notification)
    
    def send_order_confirmation(self, event: Event):
        print(f"\n📧 Sending order confirmation for {event.aggregate_id}")
    
    def send_payment_confirmation(self, event: Event):
        print(f"\n📧 Sending payment confirmation for {event.aggregate_id}")
    
    def send_failure_notification(self, event: Event):
        print(f"\n📧 Sending failure notification for {event.aggregate_id}")

# ===== Demostración =====
print("=== Event-Driven Architecture Demo ===\n")

# Inicializar servicios
order_service = OrderService(event_bus)
inventory_service = InventoryService(event_bus)
payment_service = PaymentService(event_bus)
notification_service = NotificationService(event_bus)

# Crear orden (esto desencadena toda la cadena)
import time
order = order_service.create_order(
    order_id='order_001',
    user_id='user_123',
    items=[
        {'product': 'laptop', 'price': 999, 'qty': 1},
        {'product': 'mouse', 'price': 29, 'qty': 2}
    ]
)

# Esperar a que se procesen los eventos (async)
time.sleep(0.5)

print("\n=== Event Log ===")
for event in event_bus.event_log:
    print(f"- {event.event_type} @ {event.timestamp.strftime('%H:%M:%S')}")

# Reconstruir estado desde eventos (Event Sourcing)
print("\n=== Replay Events for order_001 ===")
events = event_bus.replay_events('order_001')
for event in events:
    print(f"- {event.event_type}: {event.data}")

Patrones de mensajería:

# 1. PUBLISH-SUBSCRIBE (1:N)
# Un evento, múltiples consumidores
event_bus.subscribe('UserRegistered', send_welcome_email)
event_bus.subscribe('UserRegistered', create_default_preferences)
event_bus.subscribe('UserRegistered', send_to_analytics)

# 2. POINT-TO-POINT (1:1)
# Task queue: Un mensaje, un consumidor
task_queue = Queue()
task_queue.put({'task': 'send_email', 'to': 'user@example.com'})

# Workers compiten por tareas
def worker():
    while True:
        task = task_queue.get()
        process_task(task)
        task_queue.task_done()

# 3. REQUEST-REPLY (síncrono simulado)
# Cliente espera respuesta
def request_with_correlation():
    correlation_id = 'req_123'
    
    # Publicar request
    event_bus.publish(Event(
        event_type='GetUserRequest',
        aggregate_id=correlation_id,
        data={'user_id': '456'},
        timestamp=datetime.now()
    ))
    
    # Esperar respuesta (normalmente con timeout)
    # En producción: usar reply queue

Tecnologías para Event-Driven:


Receta 2.3: CQRS y Event Sourcing

¿Qué es CQRS? Command Query Responsibility Segregation: Separar modelos de escritura (Commands) y lectura (Queries).

¿Qué es Event Sourcing? Almacenar cambios de estado como secuencia de eventos en lugar del estado actual.

Caso de uso: Sistema bancario

from typing import List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

# ===== EVENT SOURCING =====
class EventType(Enum):
    ACCOUNT_CREATED = "account_created"
    MONEY_DEPOSITED = "money_deposited"
    MONEY_WITHDRAWN = "money_withdrawn"
    ACCOUNT_FROZEN = "account_frozen"

@dataclass
class DomainEvent:
    """Evento de dominio inmutable"""
    event_id: str
    event_type: EventType
    aggregate_id: str  # Account ID
    data: dict
    timestamp: datetime = field(default_factory=datetime.now)
    version: int = 1

class EventStore:
    """Store de eventos (en producción: EventStoreDB, PostgreSQL)"""
    
    def __init__(self):
        self.events = []  # En producción: DB append-only
    
    def append(self, event: DomainEvent):
        """Agregar evento (inmutable, solo append)"""
        self.events.append(event)
        print(f"✓ Event stored: {event.event_type.value} v{event.version}")
    
    def get_events(self, aggregate_id: str) -> List[DomainEvent]:
        """Obtener todos los eventos de un agregado"""
        return [e for e in self.events if e.aggregate_id == aggregate_id]
    
    def get_events_from_version(self, aggregate_id: str, from_version: int):
        """Obtener eventos desde cierta versión (para snapshots)"""
        return [
            e for e in self.events 
            if e.aggregate_id == aggregate_id and e.version >= from_version
        ]

# ===== WRITE MODEL (Commands) =====
class AccountAggregate:
    """Agregado que reconstruye estado desde eventos"""
    
    def __init__(self, account_id: str):
        self.account_id = account_id
        self.balance = 0
        self.status = 'inactive'
        self.version = 0
        self.uncommitted_events = []
    
    def create_account(self, owner: str, initial_deposit: float):
        """Command: Crear cuenta"""
        if self.status != 'inactive':
            raise ValueError("Account already exists")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.ACCOUNT_CREATED,
            aggregate_id=self.account_id,
            data={'owner': owner, 'initial_deposit': initial_deposit},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def deposit(self, amount: float):
        """Command: Depositar"""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.MONEY_DEPOSITED,
            aggregate_id=self.account_id,
            data={'amount': amount},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def withdraw(self, amount: float):
        """Command: Retirar"""
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        if self.status == 'frozen':
            raise ValueError("Account is frozen")
        
        event = DomainEvent(
            event_id=f"evt_{self.version + 1}",
            event_type=EventType.MONEY_WITHDRAWN,
            aggregate_id=self.account_id,
            data={'amount': amount},
            version=self.version + 1
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def _apply_event(self, event: DomainEvent):
        """Aplicar evento al estado (Event Sourcing)"""
        if event.event_type == EventType.ACCOUNT_CREATED:
            self.status = 'active'
            self.balance = event.data['initial_deposit']
        
        elif event.event_type == EventType.MONEY_DEPOSITED:
            self.balance += event.data['amount']
        
        elif event.event_type == EventType.MONEY_WITHDRAWN:
            self.balance -= event.data['amount']
        
        elif event.event_type == EventType.ACCOUNT_FROZEN:
            self.status = 'frozen'
        
        self.version = event.version
    
    def load_from_history(self, events: List[DomainEvent]):
        """Reconstruir estado desde eventos (Event Sourcing)"""
        for event in events:
            self._apply_event(event)
    
    def get_uncommitted_events(self):
        """Obtener eventos pendientes de guardar"""
        events = self.uncommitted_events
        self.uncommitted_events = []
        return events

# ===== READ MODEL (Queries) =====
class AccountReadModel:
    """Modelo optimizado para lecturas (CQRS)"""
    
    def __init__(self):
        # Diferentes proyecciones para diferentes queries
        self.accounts_by_id = {}
        self.accounts_by_balance = []  # Ordenado por balance
        self.transaction_history = {}  # {account_id: [transactions]}
    
    def project_event(self, event: DomainEvent):
        """Actualizar proyección cuando llega evento"""
        account_id = event.aggregate_id
        
        if event.event_type == EventType.ACCOUNT_CREATED:
            self.accounts_by_id[account_id] = {
                'id': account_id,
                'owner': event.data['owner'],
                'balance': event.data['initial_deposit'],
                'status': 'active',
                'created_at': event.timestamp
            }
            self.transaction_history[account_id] = []
        
        elif event.event_type == EventType.MONEY_DEPOSITED:
            self.accounts_by_id[account_id]['balance'] += event.data['amount']
            self.transaction_history[account_id].append({
                'type': 'deposit',
                'amount': event.data['amount'],
                'timestamp': event.timestamp,
                'balance_after': self.accounts_by_id[account_id]['balance']
            })
        
        elif event.event_type == EventType.MONEY_WITHDRAWN:
            self.accounts_by_id[account_id]['balance'] -= event.data['amount']
            self.transaction_history[account_id].append({
                'type': 'withdrawal',
                'amount': event.data['amount'],
                'timestamp': event.timestamp,
                'balance_after': self.accounts_by_id[account_id]['balance']
            })
        
        # Actualizar índice ordenado por balance
        self._rebuild_balance_index()
    
    def _rebuild_balance_index(self):
        """Reconstruir índice ordenado"""
        self.accounts_by_balance = sorted(
            self.accounts_by_id.values(),
            key=lambda x: x['balance'],
            reverse=True
        )
    
    def get_account(self, account_id: str):
        """Query: Obtener cuenta por ID - O(1)"""
        return self.accounts_by_id.get(account_id)
    
    def get_richest_accounts(self, limit: int = 10):
        """Query: Top cuentas por balance - O(1) por índice pre-calculado"""
        return self.accounts_by_balance[:limit]
    
    def get_transaction_history(self, account_id: str):
        """Query: Historial de transacciones - O(1)"""
        return self.transaction_history.get(account_id, [])

# ===== COMMAND HANDLER =====
class BankingCommandHandler:
    """Maneja commands y persiste eventos"""
    
    def __init__(self, event_store: EventStore, read_model: AccountReadModel):
        self.event_store = event_store
        self.read_model = read_model
    
    def handle_create_account(self, account_id: str, owner: str, initial_deposit: float):
        """Command Handler: Crear cuenta"""
        # 1. Cargar agregado desde eventos
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        # 2. Ejecutar command
        account.create_account(owner, initial_deposit)
        
        # 3. Persistir eventos
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            # 4. Actualizar read model (eventual consistency)
            self.read_model.project_event(event)
    
    def handle_deposit(self, account_id: str, amount: float):
        """Command Handler: Depositar"""
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        account.deposit(amount)
        
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            self.read_model.project_event(event)
    
    def handle_withdraw(self, account_id: str, amount: float):
        """Command Handler: Retirar"""
        account = AccountAggregate(account_id)
        events = self.event_store.get_events(account_id)
        account.load_from_history(events)
        
        account.withdraw(amount)
        
        for event in account.get_uncommitted_events():
            self.event_store.append(event)
            self.read_model.project_event(event)

# ===== QUERY HANDLER =====
class BankingQueryHandler:
    """Maneja queries (solo lectura)"""
    
    def __init__(self, read_model: AccountReadModel):
        self.read_model = read_model
    
    def get_account_balance(self, account_id: str):
        """Query: Balance actual"""
        account = self.read_model.get_account(account_id)
        return account['balance'] if account else None
    
    def get_account_details(self, account_id: str):
        """Query: Detalles completos"""
        return self.read_model.get_account(account_id)
    
    def get_top_balances(self):
        """Query: Top 10 cuentas"""
        return self.read_model.get_richest_accounts(10)
    
    def get_transactions(self, account_id: str):
        """Query: Historial"""
        return self.read_model.get_transaction_history(account_id)

# ===== DEMOSTRACIÓN =====
print("=== CQRS + Event Sourcing Demo ===\n")

event_store = EventStore()
read_model = AccountReadModel()
command_handler = BankingCommandHandler(event_store, read_model)
query_handler = BankingQueryHandler(read_model)

# COMMANDS (Write)
print("--- COMMANDS ---")
command_handler.handle_create_account('ACC001', 'Alice', 1000.0)
command_handler.handle_deposit('ACC001', 500.0)
command_handler.handle_withdraw('ACC001', 200.0)
command_handler.handle_deposit('ACC001', 1000.0)

command_handler.handle_create_account('ACC002', 'Bob', 5000.0)
command_handler.handle_deposit('ACC002', 2000.0)

# QUERIES (Read)
print("\n--- QUERIES ---")
balance = query_handler.get_account_balance('ACC001')
print(f"Alice's balance: ${balance}")

details = query_handler.get_account_details('ACC001')
print(f"Alice's account: {details}")

top_accounts = query_handler.get_top_balances()
print(f"\nTop accounts:")
for acc in top_accounts:
    print(f"  {acc['owner']}: ${acc['balance']}")

transactions = query_handler.get_transactions('ACC001')
print(f"\nAlice's transactions:")
for tx in transactions:
    print(f"  {tx['type']}: ${tx['amount']} -> Balance: ${tx['balance_after']}")

# EVENT SOURCING: Reconstruir estado desde eventos
print("\n--- EVENT SOURCING: Rebuild from history ---")
new_account = AccountAggregate('ACC001')
events = event_store.get_events('ACC001')
new_account.load_from_history(events)
print(f"Rebuilt balance: ${new_account.balance} (version {new_account.version})")

print(f"\nTotal events stored: {len(event_store.events)}")

Ventajas de CQRS + Event Sourcing:

Desventajas:


Receta 2.4: CAP Theorem y Consistencia Eventual

CAP Theorem: En un sistema distribuido, solo puedes garantizar 2 de 3:

Trade-offs:

SistemaPriorizaEjemplo
CPConsistencia + ParticionesBancos tradicionales, Zookeeper
APDisponibilidad + ParticionesDNS, Cassandra, DynamoDB
CAConsistencia + DisponibilidadRDBMS single-node (no distribuido)

Implementación de consistencia eventual:

import time
import threading
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class WriteOperation:
    """Operación de escritura con timestamp"""
    key: str
    value: any
    timestamp: float
    node_id: str

class EventuallyConsistentStore:
    """Distributed key-value store con consistencia eventual (AP)"""
    
    def __init__(self, node_id: str, replication_factor: int = 3):
        self.node_id = node_id
        self.replication_factor = replication_factor
        self.data: Dict[str, any] = {}
        self.vector_clock: Dict[str, int] = {}  # Lamport clock
        self.write_log: List[WriteOperation] = []
        self.peer_nodes = []
        self.lock = threading.Lock()
    
    def connect_to_peer(self, peer_node):
        """Conectar a otro nodo (cluster)"""
        self.peer_nodes.append(peer_node)
        print(f"✓ {self.node_id} connected to {peer_node.node_id}")
    
    def write(self, key: str, value: any):
        """Escribir con consistencia eventual"""
        with self.lock:
            # 1. Escribir localmente (disponibilidad)
            self.data[key] = value
            self.vector_clock[key] = self.vector_clock.get(key, 0) + 1
            
            operation = WriteOperation(
                key=key,
                value=value,
                timestamp=time.time(),
                node_id=self.node_id
            )
            self.write_log.append(operation)
            
            print(f"✓ {self.node_id}: Wrote {key}={value} locally")
        
        # 2. Replicar asíncronamente (eventual consistency)
        self._replicate_async(operation)
        
        return True
    
    def _replicate_async(self, operation: WriteOperation):
        """Replicar a otros nodos asíncronamente"""
        def replicate():
            time.sleep(0.1)  # Simular network delay
            
            for peer in self.peer_nodes[:self.replication_factor - 1]:
                try:
                    peer._receive_replication(operation)
                except Exception as e:
                    print(f"⚠️  Replication failed to {peer.node_id}: {e}")
        
        threading.Thread(target=replicate, daemon=True).start()
    
    def _receive_replication(self, operation: WriteOperation):
        """Recibir replicación de otro nodo"""
        with self.lock:
            current_version = self.vector_clock.get(operation.key, 0)
            
            # Resolver conflictos: Last-Write-Wins usando timestamp
            if operation.key not in self.data or operation.timestamp > self.write_log[-1].timestamp:
                self.data[operation.key] = operation.value
                self.vector_clock[operation.key] = current_version + 1
                print(f"✓ {self.node_id}: Replicated {operation.key}={operation.value} from {operation.node_id}")
    
    def read(self, key: str, consistency_level='ONE'):
        """
        Leer con diferentes niveles de consistencia
        
        ONE: Leer de un nodo (más rápido, puede ser stale)
        QUORUM: Leer de mayoría (balance)
        ALL: Leer de todos (más consistente, más lento)
        """
        if consistency_level == 'ONE':
            # Lectura local (rápida pero eventual)
            value = self.data.get(key)
            print(f"🔍 {self.node_id}: Read {key}={value} (ONE)")
            return value
        
        elif consistency_level == 'QUORUM':
            # Leer de mayoría de nodos
            quorum_size = (len(self.peer_nodes) + 1) // 2 + 1
            responses = [self.data.get(key)]
            
            for peer in self.peer_nodes[:quorum_size - 1]:
                responses.append(peer.data.get(key))
            
            # Resolver: tomar valor más común
            value = max(set(responses), key=responses.count)
            print(f"🔍 {self.node_id}: Read {key}={value} (QUORUM)")
            return value
        
        elif consistency_level == 'ALL':
            # Leer de todos (strong consistency)
            responses = [self.data.get(key)]
            
            for peer in self.peer_nodes:
                responses.append(peer.data.get(key))
            
            # Verificar que todos tienen el mismo valor
            if len(set(responses)) > 1:
                print(f"⚠️  Inconsistency detected for {key}: {set(responses)}")
            
            value = responses[0]
            print(f"🔍 {self.node_id}: Read {key}={value} (ALL)")
            return value
    
    def get_stats(self):
        """Estadísticas del nodo"""
        return {
            'node_id': self.node_id,
            'keys': len(self.data),
            'writes': len(self.write_log),
            'data': dict(self.data)
        }

# ===== DEMOSTRACIÓN =====
print("=== Eventual Consistency Demo ===\n")

# Crear cluster de 3 nodos
node1 = EventuallyConsistentStore('Node1')
node2 = EventuallyConsistentStore('Node2')
node3 = EventuallyConsistentStore('Node3')

# Conectar nodos en cluster
node1.connect_to_peer(node2)
node1.connect_to_peer(node3)
node2.connect_to_peer(node1)
node2.connect_to_peer(node3)
node3.connect_to_peer(node1)
node3.connect_to_peer(node2)

print("\n--- Scenario 1: Normal Operation ---")
node1.write('user:123', {'name': 'Alice', 'age': 30})
time.sleep(0.3)  # Esperar replicación

# Leer desde diferentes nodos
node1.read('user:123', 'ONE')
node2.read('user:123', 'ONE')  # Eventualmente consistente
node3.read('user:123', 'ONE')

print("\n--- Scenario 2: Read Before Replication ---")
node1.write('user:456', {'name': 'Bob', 'age': 25})
node2.read('user:456', 'ONE')  # Puede no estar disponible aún
time.sleep(0.3)
node2.read('user:456', 'ONE')  # Ahora sí está replicado

print("\n--- Scenario 3: Conflicting Writes ---")
# Escrituras concurrentes en diferentes nodos
node1.write('user:789', {'name': 'Charlie', 'version': 1})
node2.write('user:789', {'name': 'Charlie Updated', 'version': 2})
time.sleep(0.5)  # Esperar convergencia

# Last-Write-Wins: el valor más reciente gana
print("\nAfter convergence:")
for node in [node1, node2, node3]:
    value = node.read('user:789', 'ONE')

print("\n--- Node Stats ---")
for node in [node1, node2, node3]:
    print(f"{node.node_id}: {node.get_stats()}")

Estrategias de resolución de conflictos:

class ConflictResolution:
    """Diferentes estrategias para resolver conflictos"""
    
    @staticmethod
    def last_write_wins(values_with_timestamps):
        """LWW: El valor con timestamp más reciente gana"""
        return max(values_with_timestamps, key=lambda x: x['timestamp'])
    
    @staticmethod
    def first_write_wins(values_with_timestamps):
        """FWW: El primer valor gana"""
        return min(values_with_timestamps, key=lambda x: x['timestamp'])
    
    @staticmethod
    def merge_sets(values):
        """Para sets: union de todos los valores"""
        result = set()
        for value in values:
            result.update(value)
        return result
    
    @staticmethod
    def custom_merge(values, merge_function):
        """Merge custom definido por la aplicación"""
        return merge_function(values)
    
    @staticmethod
    def vector_clock_resolution(values_with_clocks):
        """Usar vector clocks para detectar causalidad"""
        # Implementación simplificada
        for v1 in values_with_clocks:
            is_dominated = False
            for v2 in values_with_clocks:
                if v1 != v2 and all(v1['clock'][k] <= v2['clock'].get(k, 0) for k in v1['clock']):
                    is_dominated = True
                    break
            
            if not is_dominated:
                return v1  # Este valor no es dominado por ningún otro
        
        return values_with_clocks[0]  # Conflicto real, usar estrategia de fallback

# Ejemplo: Shopping cart con merge
class ShoppingCart:
    """Carrito de compras con consistencia eventual"""
    
    @staticmethod
    def merge_carts(cart1: set, cart2: set):
        """Merge de carritos: union de items"""
        # ADD WINS semántica: nunca perdemos items
        return cart1.union(cart2)

cart_node1 = {'item1', 'item2'}
cart_node2 = {'item2', 'item3'}
merged = ShoppingCart.merge_carts(cart_node1, cart_node2)
print(f"Merged cart: {merged}")  # {'item1', 'item2', 'item3'}

Cuándo usar consistencia eventual:

Cuándo NO usar:


Receta 2.5: Load Balancing y Caching

Load Balancing: Distribuir tráfico entre servidores

from typing import List
from dataclasses import dataclass
import hashlib
import random

@dataclass
class Server:
    """Servidor backend"""
    id: str
    host: str
    port: int
    weight: int = 1  # Para weighted round-robin
    health: str = 'healthy'  # healthy, unhealthy
    active_connections: int = 0

class LoadBalancer:
    """Load balancer con múltiples algoritmos"""
    
    def __init__(self, servers: List[Server]):
        self.servers = servers
        self.round_robin_index = 0
        self.request_count = 0
    
    def get_server_round_robin(self) -> Server:
        """Round Robin: Rotar entre servidores equitativamente"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        server = healthy_servers[self.round_robin_index % len(healthy_servers)]
        self.round_robin_index += 1
        return server
    
    def get_server_weighted_round_robin(self) -> Server:
        """Weighted Round Robin: Considerar capacidad del servidor"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        # Crear lista con repeticiones basadas en peso
        weighted_servers = []
        for server in healthy_servers:
            weighted_servers.extend([server] * server.weight)
        
        server = weighted_servers[self.round_robin_index % len(weighted_servers)]
        self.round_robin_index += 1
        return server
    
    def get_server_least_connections(self) -> Server:
        """Least Connections: Servidor con menos conexiones activas"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        return min(healthy_servers, key=lambda s: s.active_connections)
    
    def get_server_ip_hash(self, client_ip: str) -> Server:
        """IP Hash: Mismo cliente siempre al mismo servidor (sticky sessions)"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        # Hash consistente del IP
        hash_value = int(hashlib.md5(client_ip.encode()).hexdigest(), 16)
        index = hash_value % len(healthy_servers)
        return healthy_servers[index]
    
    def get_server_random(self) -> Server:
        """Random: Servidor aleatorio"""
        healthy_servers = [s for s in self.servers if s.health == 'healthy']
        if not healthy_servers:
            raise Exception("No healthy servers")
        
        return random.choice(healthy_servers)
    
    def health_check(self):
        """Health check de servidores"""
        for server in self.servers:
            # En producción: HTTP health check
            # Simulación: random failure
            if random.random() < 0.1:  # 10% chance de fallo
                server.health = 'unhealthy'
                print(f"⚠️  {server.id} is unhealthy")
            else:
                server.health = 'healthy'

# ===== CACHING STRATEGIES =====
import time
from collections import OrderedDict

class LRUCache:
    """Least Recently Used Cache"""
    
    def __init__(self, capacity: int):
        self.cache = OrderedDict()
        self.capacity = capacity
        self.hits = 0
        self.misses = 0
    
    def get(self, key: str):
        """Obtener del cache"""
        if key in self.cache:
            # Mover al final (más recientemente usado)
            self.cache.move_to_end(key)
            self.hits += 1
            return self.cache[key]
        
        self.misses += 1
        return None
    
    def put(self, key: str, value):
        """Agregar al cache"""
        if key in self.cache:
            self.cache.move_to_end(key)
        else:
            if len(self.cache) >= self.capacity:
                # Eliminar el menos recientemente usado (primero)
                self.cache.popitem(last=False)
        
        self.cache[key] = value
    
    def get_stats(self):
        """Estadísticas de cache"""
        total = self.hits + self.misses
        hit_rate = (self.hits / total * 100) if total > 0 else 0
        return {
            'hits': self.hits,
            'misses': self.misses,
            'hit_rate': f'{hit_rate:.1f}%',
            'size': len(self.cache)
        }

class CacheAside:
    """Cache-Aside pattern (Lazy Loading)"""
    
    def __init__(self, cache: LRUCache, database):
        self.cache = cache
        self.database = database
    
    def get(self, key: str):
        """
        1. Buscar en cache
        2. Si no está, buscar en DB
        3. Guardar en cache
        """
        # 1. Buscar en cache
        value = self.cache.get(key)
        if value is not None:
            print(f"✓ Cache HIT for {key}")
            return value
        
        # 2. Cache MISS: buscar en DB
        print(f"✗ Cache MISS for {key}, fetching from DB...")
        time.sleep(0.1)  # Simular latencia de DB
        value = self.database.get(key)
        
        # 3. Guardar en cache
        if value is not None:
            self.cache.put(key, value)
        
        return value
    
    def update(self, key: str, value):
        """
        Write-Through: Actualizar DB y cache
        """
        # 1. Actualizar DB
        self.database[key] = value
        
        # 2. Actualizar cache
        self.cache.put(key, value)

class WriteBackCache:
    """Write-Back pattern: escribir a DB asíncronamente"""
    
    def __init__(self, cache: LRUCache, database, flush_interval=5):
        self.cache = cache
        self.database = database
        self.dirty_keys = set()
        self.flush_interval = flush_interval
        
        # Background flusher
        import threading
        self.flusher_thread = threading.Thread(target=self._flush_periodically, daemon=True)
        self.flusher_thread.start()
    
    def write(self, key: str, value):
        """Escribir en cache (lazy DB write)"""
        self.cache.put(key, value)
        self.dirty_keys.add(key)
        print(f"✓ Wrote {key} to cache (DB write pending)")
    
    def _flush_periodically(self):
        """Flush cache a DB periódicamente"""
        while True:
            time.sleep(self.flush_interval)
            self._flush()
    
    def _flush(self):
        """Flush dirty keys a DB"""
        if self.dirty_keys:
            print(f"\n🔄 Flushing {len(self.dirty_keys)} keys to DB...")
            for key in list(self.dirty_keys):
                value = self.cache.get(key)
                if value is not None:
                    self.database[key] = value
            
            self.dirty_keys.clear()
            print("✓ Flush complete")

# ===== CACHE INVALIDATION =====
class CacheInvalidation:
    """Estrategias de invalidación de cache"""
    
    @staticmethod
    def ttl_based(cache_entry, ttl_seconds=300):
        """Time-To-Live: invalidar después de N segundos"""
        age = time.time() - cache_entry['timestamp']
        return age > ttl_seconds
    
    @staticmethod
    def event_based(cache, event_type, affected_keys):
        """Event-based: invalidar cuando cambian datos"""
        for key in affected_keys:
            cache.cache.pop(key, None)
        print(f"✓ Invalidated {len(affected_keys)} keys due to {event_type}")
    
    @staticmethod
    def tag_based(cache, tags):
        """Tag-based: invalidar por categorías"""
        # Cache entries con tags: {'key': 'user:123', 'tags': ['users', 'profile']}
        # Invalidar todos los entries con tag 'users'
        pass

# ===== DEMOSTRACIÓN =====
print("=== Load Balancing Demo ===\n")

servers = [
    Server('server1', '10.0.0.1', 8080, weight=3),
    Server('server2', '10.0.0.2', 8080, weight=2),
    Server('server3', '10.0.0.3', 8080, weight=1),
]

lb = LoadBalancer(servers)

# Round Robin
print("--- Round Robin ---")
for i in range(6):
    server = lb.get_server_round_robin()
    print(f"Request {i+1}{server.id}")

# Weighted Round Robin
print("\n--- Weighted Round Robin ---")
lb.round_robin_index = 0
for i in range(6):
    server = lb.get_server_weighted_round_robin()
    print(f"Request {i+1}{server.id}")

# IP Hash (sticky sessions)
print("\n--- IP Hash (Sticky Sessions) ---")
clients = ['192.168.1.1', '192.168.1.2', '192.168.1.1', '192.168.1.3', '192.168.1.1']
for i, client_ip in enumerate(clients):
    server = lb.get_server_ip_hash(client_ip)
    print(f"Request {i+1} from {client_ip}{server.id}")

print("\n=== Caching Demo ===\n")

# Setup
database = {
    'user:1': {'name': 'Alice', 'age': 30},
    'user:2': {'name': 'Bob', 'age': 25},
    'user:3': {'name': 'Charlie', 'age': 35}
}

cache = LRUCache(capacity=2)
cache_aside = CacheAside(cache, database)

# Cache-Aside pattern
print("--- Cache-Aside Pattern ---")
cache_aside.get('user:1')  # MISS
cache_aside.get('user:1')  # HIT
cache_aside.get('user:2')  # MISS
cache_aside.get('user:3')  # MISS (evicts user:1)
cache_aside.get('user:1')  # MISS again

print(f"\nCache stats: {cache.get_stats()}")

# Write-Back pattern
print("\n--- Write-Back Pattern ---")
wb_cache = WriteBackCache(LRUCache(10), {})
wb_cache.write('config:theme', 'dark')
wb_cache.write('config:language', 'es')
print("Waiting for flush...")
time.sleep(6)
print(f"DB contents: {wb_cache.database}")

Estrategias de caching por capa:

CapaEjemploTTL típico
CDNCloudflare, AkamaiHoras/Días
ApplicationRedis, MemcachedMinutos/Horas
DatabaseQuery cacheSegundos/Minutos
BrowserLocal storageDías/Semanas