🏗️ FASE 2: Arquitectura y Diseño de Sistemas

Recetas prácticas para diseñar sistemas escalables y robustos


📚 Tabla de Contenidos

  1. Receta 2.6: SQL Avanzado - Optimización
  2. Receta 2.7: NoSQL - Cuándo y cómo usar

Bases de Datos

Receta 2.6: SQL Avanzado - Optimización

Índices: El arma secreta de performance

-- ===== ÍNDICES =====

-- Tabla de ejemplo
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE,
    name VARCHAR(100),
    age INTEGER,
    city VARCHAR(100),
    created_at TIMESTAMP DEFAULT NOW()
);

-- ❌ SIN ÍNDICE: Full table scan - O(n)
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Seq Scan on users (cost=0.00..1234.00 rows=1)

-- ✅ CON ÍNDICE: Index scan - O(log n)
CREATE INDEX idx_users_email ON users(email);

EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Index Scan using idx_users_email (cost=0.42..8.44 rows=1)

-- ÍNDICES COMPUESTOS (múltiples columnas)
CREATE INDEX idx_users_city_age ON users(city, age);

-- ✅ BIEN: Usa el índice
SELECT * FROM users WHERE city = 'NYC' AND age > 25;

-- ⚠️  PARCIAL: Solo usa parte del índice
SELECT * FROM users WHERE age > 25;  -- No usa índice (age no es primera columna)

-- ❌ MAL: No usa índice
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';

-- ✅ SOLUCIÓN: Índice funcional
CREATE INDEX idx_users_email_lower ON users(LOWER(email));
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';

-- ÍNDICE PARCIAL (solo subset de datos)
CREATE INDEX idx_active_users ON users(city) WHERE created_at > '2024-01-01';

-- Útil para queries que filtran datos recientes
SELECT * FROM users WHERE city = 'LA' AND created_at > '2024-01-01';

-- ÍNDICE COVERING (incluye columnas extra)
CREATE INDEX idx_users_city_covering ON users(city) INCLUDE (name, age);

-- Query satisfecha completamente con el índice (no lee la tabla)
SELECT name, age FROM users WHERE city = 'Boston';

Window Functions: Análisis poderoso

-- ===== WINDOW FUNCTIONS =====

CREATE TABLE sales (
    id SERIAL PRIMARY KEY,
    salesperson VARCHAR(100),
    region VARCHAR(50),
    amount DECIMAL(10, 2),
    sale_date DATE
);

-- ROW_NUMBER: Numerar filas
SELECT 
    salesperson,
    amount,
    ROW_NUMBER() OVER (ORDER BY amount DESC) as rank
FROM sales;

-- RANK con empates
SELECT 
    salesperson,
    amount,
    RANK() OVER (ORDER BY amount DESC) as rank,
    DENSE_RANK() OVER (ORDER BY amount DESC) as dense_rank
FROM sales;

-- PARTITION BY: Agrupar ventanas
SELECT 
    salesperson,
    region,
    amount,
    RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region
FROM sales;

-- Running totals (suma acumulativa)
SELECT 
    sale_date,
    amount,
    SUM(amount) OVER (ORDER BY sale_date) as running_total
FROM sales
ORDER BY sale_date;

-- Moving average (promedio móvil de 7 días)
SELECT 
    sale_date,
    amount,
    AVG(amount) OVER (
        ORDER BY sale_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7days
FROM sales;

-- LAG y LEAD: Acceder a filas anteriores/siguientes
SELECT 
    sale_date,
    amount,
    LAG(amount, 1) OVER (ORDER BY sale_date) as prev_day,
    LEAD(amount, 1) OVER (ORDER BY sale_date) as next_day,
    amount - LAG(amount, 1) OVER (ORDER BY sale_date) as day_over_day_change
FROM sales;

-- NTILE: Dividir en cuartiles/percentiles
SELECT 
    salesperson,
    amount,
    NTILE(4) OVER (ORDER BY amount) as quartile
FROM sales;

Common Table Expressions (CTEs): Queries legibles

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Setup
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
session = Session()

def demonstrate_ctes():
    """CTEs para queries complejas y legibles"""
    
    # ===== CTE SIMPLE =====
    query = text("""
        WITH high_value_customers AS (
            SELECT 
                customer_id,
                SUM(order_total) as lifetime_value
            FROM orders
            GROUP BY customer_id
            HAVING SUM(order_total) > 10000
        )
        SELECT 
            c.name,
            c.email,
            hvc.lifetime_value
        FROM customers c
        JOIN high_value_customers hvc ON c.id = hvc.customer_id
        ORDER BY hvc.lifetime_value DESC;
    """)
    
    # ===== CTE RECURSIVO: Jerarquías =====
    org_chart_query = text("""
        -- Tabla de empleados con manager_id
        WITH RECURSIVE employee_hierarchy AS (
            -- Caso base: CEO (sin manager)
            SELECT 
                id,
                name,
                manager_id,
                1 as level,
                name as path
            FROM employees
            WHERE manager_id IS NULL
            
            UNION ALL
            
            -- Caso recursivo: empleados con managers
            SELECT 
                e.id,
                e.name,
                e.manager_id,
                eh.level + 1,
                eh.path || ' > ' || e.name
            FROM employees e
            JOIN employee_hierarchy eh ON e.manager_id = eh.id
        )
        SELECT 
            level,
            name,
            path
        FROM employee_hierarchy
        ORDER BY level, name;
    """)
    
    # ===== MÚLTIPLES CTEs =====
    complex_query = text("""
        WITH 
        -- CTE 1: Ventas mensuales
        monthly_sales AS (
            SELECT 
                DATE_TRUNC('month', sale_date) as month,
                SUM(amount) as total_sales
            FROM sales
            GROUP BY DATE_TRUNC('month', sale_date)
        ),
        -- CTE 2: Promedio móvil
        sales_with_avg AS (
            SELECT 
                month,
                total_sales,
                AVG(total_sales) OVER (
                    ORDER BY month
                    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
                ) as moving_avg_3months
            FROM monthly_sales
        ),
        -- CTE 3: Growth rate
        sales_with_growth AS (
            SELECT 
                month,
                total_sales,
                moving_avg_3months,
                (total_sales - LAG(total_sales) OVER (ORDER BY month)) / 
                    LAG(total_sales) OVER (ORDER BY month) * 100 as growth_rate
            FROM sales_with_avg
        )
        SELECT * FROM sales_with_growth
        WHERE growth_rate IS NOT NULL
        ORDER BY month DESC;
    """)
    
    results = session.execute(complex_query)
    return results.fetchall()

# ===== OPTIMIZACIÓN DE QUERIES =====
def query_optimization_tips():
    """Tips para optimizar queries"""
    
    # ❌ MAL: N+1 queries
    users = session.execute(text("SELECT * FROM users")).fetchall()
    for user in users:
        # Para cada usuario, query separado (N queries)
        orders = session.execute(
            text("SELECT * FROM orders WHERE user_id = :user_id"),
            {"user_id": user.id}
        ).fetchall()
    
    # ✅ BIEN: JOIN (1 query)
    results = session.execute(text("""
        SELECT 
            u.*,
            o.*
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
    """)).fetchall()
    
    # ❌ MAL: SELECT *
    session.execute(text("SELECT * FROM users"))  # Trae todas las columnas
    
    # ✅ BIEN: SELECT específico
    session.execute(text("SELECT id, email, name FROM users"))
    
    # ❌ MAL: OR en múltiples columnas (no usa índices)
    session.execute(text("""
        SELECT * FROM products 
        WHERE category = 'electronics' OR brand = 'Apple'
    """))
    
    # ✅ BIEN: UNION de queries indexadas
    session.execute(text("""
        SELECT * FROM products WHERE category = 'electronics'
        UNION
        SELECT * FROM products WHERE brand = 'Apple'
    """))
    
    # ✅ MEJOR: Si es AND, usa índice compuesto
    # CREATE INDEX idx_products_category_brand ON products(category, brand)
    session.execute(text("""
        SELECT * FROM products 
        WHERE category = 'electronics' AND brand = 'Apple'
    """))

Transacciones y niveles de aislamiento:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)

@contextmanager
def transaction_scope(isolation_level='READ COMMITTED'):
    """Context manager para transacciones"""
    session = Session()
    session.connection(execution_options={"isolation_level": isolation_level})
    
    try:
        yield session
        session.commit()
    except Exception as e:
        session.rollback()
        raise
    finally:
        session.close()

# ===== ACID PROPERTIES =====

def demonstrate_transactions():
    """Demostrar propiedades ACID"""
    
    # ATOMICITY: Todo o nada
    with transaction_scope() as session:
        session.execute(text("""
            UPDATE accounts SET balance = balance - 100 WHERE id = 1
        """))
        session.execute(text("""
            UPDATE accounts SET balance = balance + 100 WHERE id = 2
        """))
        # Ambas operaciones o ninguna (commit/rollback)
    
    # ISOLATION LEVELS
    
    # 1. READ UNCOMMITTED (dirty reads posibles)
    with transaction_scope('READ UNCOMMITTED') as session:
        # Puede ver cambios no commiteados de otras transacciones
        pass
    
    # 2. READ COMMITTED (default en PostgreSQL)
    with transaction_scope('READ COMMITTED') as session:
        # Solo ve cambios commiteados
        # Pero puede ver diferentes datos en múltiples reads
        pass
    
    # 3. REPEATABLE READ
    with transaction_scope('REPEATABLE READ') as session:
        # Mismos datos en múltiples reads dentro de la transacción
        # Phantom reads posibles (nuevas filas)
        pass
    
    # 4. SERIALIZABLE (más estricto)
    with transaction_scope('SERIALIZABLE') as session:
        # Transacciones ejecutan como si fueran secuenciales
        # Sin phantom reads, pero puede fallar con serialization errors
        pass

# ===== DEADLOCKS Y OPTIMISTIC LOCKING =====

def handle_deadlocks():
    """Manejar deadlocks con retry"""
    from sqlalchemy.exc import OperationalError
    import time
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            with transaction_scope() as session:
                # Operaciones que pueden causar deadlock
                session.execute(text("""
                    UPDATE inventory 
                    SET quantity = quantity - 1 
                    WHERE product_id = :id
                """), {"id": 123})
                
                session.execute(text("""
                    INSERT INTO order_items (product_id, quantity) 
                    VALUES (:id, 1)
                """), {"id": 123})
                
                break  # Éxito
        
        except OperationalError as e:
            if 'deadlock detected' in str(e).lower():
                if attempt < max_retries - 1:
                    print(f"Deadlock detected, retrying... (attempt {attempt + 1})")
                    time.sleep(0.1 * (attempt + 1))  # Exponential backoff
                else:
                    raise

# Optimistic locking con version number
class ProductWithVersion:
    """
    CREATE TABLE products (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        price DECIMAL(10,2),
        version INTEGER DEFAULT 1
    )
    """
    
    @staticmethod
    def update_with_optimistic_lock(product_id, new_price, expected_version):
        with transaction_scope() as session:
            result = session.execute(text("""
                UPDATE products 
                SET price = :new_price, version = version + 1
                WHERE id = :id AND version = :expected_version
                RETURNING version
            """), {
                "new_price": new_price,
                "id": product_id,
                "expected_version": expected_version
            })
            
            if result.rowcount == 0:
                raise Exception("Optimistic lock failed: product was modified")
            
            return result.fetchone()[0]  # Nueva versión

Receta 2.7: NoSQL - Cuándo y cómo usar

Comparación SQL vs NoSQL:

CaracterísticaSQLNoSQL
SchemaRígido (tablas)Flexible (documentos, key-value)
EscalabilidadVerticalHorizontal
TransaccionesACID completoEventual consistency (mayormente)
Joins✅ Eficientes❌ Limitados/evitar
Casos de usoFinanzas, ERPRedes sociales, IoT, logs

MongoDB: Document Store

from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
import json

# Conectar a MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']

# ===== OPERACIONES BÁSICAS =====

def mongodb_crud():
    """CRUD en MongoDB"""
    products = db['products']
    
    # CREATE
    product = {
        'name': 'Laptop Pro',
        'price': 1299.99,
        'specs': {
            'cpu': 'Intel i7',
            'ram': '16GB',
            'storage': '512GB SSD'
        },
        'tags': ['electronics', 'computers'],
        'in_stock': True,
        'created_at': datetime.now()
    }
    result = products.insert_one(product)
    print(f"Inserted ID: {result.inserted_id}")
    
    # READ
    laptop = products.find_one({'name': 'Laptop Pro'})
    print(f"Found: {laptop}")
    
    # UPDATE
    products.update_one(
        {'name': 'Laptop Pro'},
        {'$set': {'price': 1199.99, 'in_stock': False}}
    )
    
    # DELETE
    products.delete_one({'name': 'Laptop Pro'})
    
    # BULK INSERT
    products.insert_many([
        {'name': 'Mouse', 'price': 29.99, 'category': 'accessories'},
        {'name': 'Keyboard', 'price': 79.99, 'category': 'accessories'},
    ])

# ===== QUERIES AVANZADOS =====

def mongodb_queries():
    """Queries complejos en MongoDB"""
    products = db['products']
    
    # Queries con operadores
    expensive = products.find({'price': {'$gt': 100}})  # Mayor que
    electronics = products.find({'tags': {'$in': ['electronics', 'computers']}})
    
    # Text search (requiere índice de texto)
    products.create_index([('name', 'text'), ('description', 'text')])
    search_results = products.find({'$text': {'$search': 'laptop pro'}})
    
    # Aggregation pipeline
    pipeline = [
        # Stage 1: Filtrar
        {'$match': {'in_stock': True}},
        
        # Stage 2: Agrupar
        {'$group': {
            '_id': '$category',
            'total_products': {'$sum': 1},
            'avg_price': {'$avg': '$price'},
            'max_price': {'$max': '$price'}
        }},
        
        # Stage 3: Ordenar
        {'$sort': {'avg_price': -1}},
        
        # Stage 4: Limitar
        {'$limit': 10}
    ]
    
    results = list(products.aggregate(pipeline))
    print(f"Aggregation results: {json.dumps(results, indent=2, default=str)}")
    
    # Lookup (join)
    orders = db['orders']
    pipeline = [
        {'$lookup': {
            'from': 'products',
            'localField': 'product_id',
            'foreignField': '_id',
            'as': 'product_details'
        }},
        {'$unwind': '$product_details'}
    ]
    
    enriched_orders = list(orders.aggregate(pipeline))

# ===== MODELADO DE DATOS =====

def data_modeling_patterns():
    """Patrones de modelado en MongoDB"""
    
    # PATRÓN 1: Embedding (desnormalización)
    # Usar cuando: Relación 1-a-pocos, datos se leen juntos
    user_with_address = {
        '_id': 'user_123',
        'name': 'Alice',
        'email': 'alice@example.com',
        'address': {  # Embedded document
            'street': '123 Main St',
            'city': 'NYC',
            'zip': '10001'
        },
        'recent_orders': [  # Embedded array
            {'order_id': 'ord_1', 'total': 99.99},
            {'order_id': 'ord_2', 'total': 149.99}
        ]
    }
    
    # PATRÓN 2: Referencing (normalización)
    # Usar cuando: Relación 1-a-muchos, datos se actualizan independientemente
    user = {
        '_id': 'user_123',
        'name': 'Alice',
        'email': 'alice@example.com'
    }
    
    orders = [
        {'_id': 'ord_1', 'user_id': 'user_123', 'total': 99.99},
        {'_id': 'ord_2', 'user_id': 'user_123', 'total': 149.99}
    ]
    
    # PATRÓN 3: Hybrid (combinación)
    # Denormalizar datos críticos, referenciar el resto
    blog_post = {
        '_id': 'post_456',
        'title': 'MongoDB Best Practices',
        'author': {  # Denormalizar datos básicos del autor
            'id': 'user_123',
            'name': 'Alice',
            'avatar': 'url'
        },
        'comments_count': 42,  # Denormalizar conteo
        'comment_ids': ['cmt_1', 'cmt_2']  # Referenciar comments completos
    }
    
    # PATRÓN 4: Bucketing (para time-series data)
    # Agrupar múltiples eventos en un documento
    sensor_readings_bucket = {
        '_id': 'sensor_123_2024-01-15_10',
        'sensor_id': 'sensor_123',
        'date': datetime(2024, 1, 15, 10),
        'readings': [
            {'time': '10:00', 'temp': 22.5, 'humidity': 45},
            {'time': '10:01', 'temp': 22.6, 'humidity': 46},
            # ... hasta 60 readings (1 por minuto)
        ]
    }

# ===== ÍNDICES Y PERFORMANCE =====

def mongodb_indexes():
    """Índices en MongoDB"""
    products = db['products']
    
    # Índice simple
    products.create_index([('price', ASCENDING)])
    
    # Índice compuesto
    products.create_index([
        ('category', ASCENDING),
        ('price', DESCENDING)
    ])
    
    # Índice único
    users = db['users']
    users.create_index([('email', ASCENDING)], unique=True)
    
    # Índice TTL (auto-delete después de N segundos)
    sessions = db['sessions']
    sessions.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
    
    # Índice de texto
    products.create_index([('name', 'text'), ('description', 'text')])
    
    # Ver índices
    print("Indexes:", list(products.list_indexes()))
    
    # Explain para ver uso de índices
    explain = products.find({'price': {'$gt': 100}}).explain()
    print(f"Query plan: {explain['executionStats']}")

Redis: In-Memory Key-Value Store

import redis
import json
from datetime import timedelta

# Conectar a Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ===== OPERACIONES BÁSICAS =====

def redis_basics():
    """Operaciones básicas en Redis"""
    
    # STRING operations
    r.set('user:1000:name', 'Alice')
    r.set('user:1000:email', 'alice@example.com')
    
    name = r.get('user:1000:name')
    print(f"Name: {name}")
    
    # SET con expiración
    r.setex('session:abc123', timedelta(hours=1), 'user_data')
    
    # Incremento atómico
    r.set('page:views', 0)
    r.incr('page:views')
    r.incrby('page:views', 10)
    views = r.get('page:views')
    
    # HASH operations (objects)
    r.hset('user:1000', mapping={
        'name': 'Alice',
        'email': 'alice@example.com',
        'age': '30'
    })
    
    user = r.hgetall('user:1000')
    print(f"User: {user}")
    
    r.hincrby('user:1000', 'age', 1)  # Increment age
    
    # LIST operations (queues)
    r.lpush('queue:emails', 'email1@example.com')  # Push left
    r.rpush('queue:emails', 'email2@example.com')  # Push right
    
    email = r.lpop('queue:emails')  # Pop left (FIFO)
    print(f"Processing: {email}")
    
    # SET operations (unique values)
    r.sadd('tags:post:123', 'python', 'redis', 'nosql')
    r.sadd('tags:post:456', 'python', 'mongodb')
    
    # Intersección (tags comunes)
    common_tags = r.sinter('tags:post:123', 'tags:post:456')
    print(f"Common tags: {common_tags}")
    
    # SORTED SET (leaderboards)
    r.zadd('leaderboard', {
        'Alice': 1500,
        'Bob': 1200,
        'Charlie': 1800
    })
    
    # Top 3
    top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
    print(f"Leaderboard: {top3}")
    
    # Rank de un usuario
    rank = r.zrevrank('leaderboard', 'Alice')
    print(f"Alice's rank: {rank + 1}")

# ===== CASOS DE USO =====

class RateLimiter:
    """Rate limiter con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_allowed(self, user_id: str, max_requests: int = 10, window_seconds: int = 60):
        """
        Sliding window rate limiter
        Permite max_requests en window_seconds
        """
        key = f"rate_limit:{user_id}"
        current_time = int(time.time())
        window_start = current_time - window_seconds
        
        # Limpiar requests antiguos
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # Contar requests en ventana
        request_count = self.redis.zcard(key)
        
        if request_count < max_requests:
            # Agregar request actual
            self.redis.zadd(key, {str(current_time): current_time})
            self.redis.expire(key, window_seconds)
            return True
        
        return False

class DistributedLock:
    """Distributed lock con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def acquire_lock(self, lock_name: str, timeout: int = 10):
        """Adquirir lock"""
        lock_key = f"lock:{lock_name}"
        
        # SET NX (solo si no existe) con expiración
        acquired = self.redis.set(
            lock_key,
            'locked',
            nx=True,
            ex=timeout
        )
        
        return acquired
    
    def release_lock(self, lock_name: str):
        """Liberar lock"""
        self.redis.delete(f"lock:{lock_name}")
    
    def with_lock(self, lock_name: str):
        """Context manager para lock"""
        from contextlib import contextmanager
        
        @contextmanager
        def _lock():
            acquired = self.acquire_lock(lock_name)
            if not acquired:
                raise Exception(f"Could not acquire lock: {lock_name}")
            
            try:
                yield
            finally:
                self.release_lock(lock_name)
        
        return _lock()

class SessionStore:
    """Session storage con Redis"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def create_session(self, user_id: str, session_data: dict, ttl: int = 3600):
        """Crear sesión"""
        import uuid
        session_id = str(uuid.uuid4())
        
        self.redis.setex(
            f"session:{session_id}",
            ttl,
            json.dumps({**session_data, 'user_id': user_id})
        )
        
        return session_id
    
    def get_session(self, session_id: str):
        """Obtener sesión"""
        data = self.redis.get(f"session:{session_id}")
        return json.loads(data) if data else None
    
    def extend_session(self, session_id: str, ttl: int = 3600):
        """Extender sesión"""
        self.redis.expire(f"session:{session_id}", ttl)
    
    def delete_session(self, session_id: str):
        """Eliminar sesión"""
        self.redis.delete(f"session:{session_id}")

# Usar los componentes
import time

rate_limiter = RateLimiter(r)
for i in range(15):
    allowed = rate_limiter.is_allowed('user_123', max_requests=10, window_seconds=60)
    print(f"Request {i+1}: {'✓ Allowed' if allowed else '✗ Rate limited'}")

distributed_lock = DistributedLock(r)
with distributed_lock.with_lock('critical_section'):
    print("Doing critical work...")
    time.sleep(1)

session_store = SessionStore(r)
sid = session_store.create_session('user_456', {'theme': 'dark'})
print(f"Session created: {sid}")
session = session_store.get_session(sid)
print(f"Session data: {session}")

Cuándo usar cada tipo de NoSQL:

  1. Document Store (MongoDB, CouchDB)

    • ✅ Esquemas flexibles que evolucionan
    • ✅ Datos jerárquicos (JSON-like)
    • ✅ Content management, catálogos de productos
  2. Key-Value (Redis, DynamoDB)

    • ✅ Caching
    • ✅ Session storage
    • ✅ Real-time analytics
    • ✅ Rate limiting
  3. Column-Family (Cassandra, HBase)

    • ✅ Time-series data
    • ✅ Logs a escala masiva
    • ✅ IoT sensor data
  4. Graph (Neo4j, ArangoDB)

    • ✅ Social networks
    • ✅ Recommendation engines
    • ✅ Fraud detection
    • ✅ Knowledge graphs