🏗️ FASE 2: Arquitectura y Diseño de Sistemas
Recetas prácticas para diseñar sistemas escalables y robustos
📚 Tabla de Contenidos
Bases de Datos
Receta 2.6: SQL Avanzado - Optimización
Índices: El arma secreta de performance
-- ===== ÍNDICES =====
-- Tabla de ejemplo
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE,
name VARCHAR(100),
age INTEGER,
city VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
);
-- ❌ SIN ÍNDICE: Full table scan - O(n)
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Seq Scan on users (cost=0.00..1234.00 rows=1)
-- ✅ CON ÍNDICE: Index scan - O(log n)
CREATE INDEX idx_users_email ON users(email);
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'alice@example.com';
-- Index Scan using idx_users_email (cost=0.42..8.44 rows=1)
-- ÍNDICES COMPUESTOS (múltiples columnas)
CREATE INDEX idx_users_city_age ON users(city, age);
-- ✅ BIEN: Usa el índice
SELECT * FROM users WHERE city = 'NYC' AND age > 25;
-- ⚠️ PARCIAL: Solo usa parte del índice
SELECT * FROM users WHERE age > 25; -- No usa índice (age no es primera columna)
-- ❌ MAL: No usa índice
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';
-- ✅ SOLUCIÓN: Índice funcional
CREATE INDEX idx_users_email_lower ON users(LOWER(email));
SELECT * FROM users WHERE LOWER(email) = 'alice@example.com';
-- ÍNDICE PARCIAL (solo subset de datos)
CREATE INDEX idx_active_users ON users(city) WHERE created_at > '2024-01-01';
-- Útil para queries que filtran datos recientes
SELECT * FROM users WHERE city = 'LA' AND created_at > '2024-01-01';
-- ÍNDICE COVERING (incluye columnas extra)
CREATE INDEX idx_users_city_covering ON users(city) INCLUDE (name, age);
-- Query satisfecha completamente con el índice (no lee la tabla)
SELECT name, age FROM users WHERE city = 'Boston';
Window Functions: Análisis poderoso
-- ===== WINDOW FUNCTIONS =====
CREATE TABLE sales (
id SERIAL PRIMARY KEY,
salesperson VARCHAR(100),
region VARCHAR(50),
amount DECIMAL(10, 2),
sale_date DATE
);
-- ROW_NUMBER: Numerar filas
SELECT
salesperson,
amount,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank
FROM sales;
-- RANK con empates
SELECT
salesperson,
amount,
RANK() OVER (ORDER BY amount DESC) as rank,
DENSE_RANK() OVER (ORDER BY amount DESC) as dense_rank
FROM sales;
-- PARTITION BY: Agrupar ventanas
SELECT
salesperson,
region,
amount,
RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region
FROM sales;
-- Running totals (suma acumulativa)
SELECT
sale_date,
amount,
SUM(amount) OVER (ORDER BY sale_date) as running_total
FROM sales
ORDER BY sale_date;
-- Moving average (promedio móvil de 7 días)
SELECT
sale_date,
amount,
AVG(amount) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7days
FROM sales;
-- LAG y LEAD: Acceder a filas anteriores/siguientes
SELECT
sale_date,
amount,
LAG(amount, 1) OVER (ORDER BY sale_date) as prev_day,
LEAD(amount, 1) OVER (ORDER BY sale_date) as next_day,
amount - LAG(amount, 1) OVER (ORDER BY sale_date) as day_over_day_change
FROM sales;
-- NTILE: Dividir en cuartiles/percentiles
SELECT
salesperson,
amount,
NTILE(4) OVER (ORDER BY amount) as quartile
FROM sales;
Common Table Expressions (CTEs): Queries legibles
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Setup
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
session = Session()
def demonstrate_ctes():
"""CTEs para queries complejas y legibles"""
# ===== CTE SIMPLE =====
query = text("""
WITH high_value_customers AS (
SELECT
customer_id,
SUM(order_total) as lifetime_value
FROM orders
GROUP BY customer_id
HAVING SUM(order_total) > 10000
)
SELECT
c.name,
c.email,
hvc.lifetime_value
FROM customers c
JOIN high_value_customers hvc ON c.id = hvc.customer_id
ORDER BY hvc.lifetime_value DESC;
""")
# ===== CTE RECURSIVO: Jerarquías =====
org_chart_query = text("""
-- Tabla de empleados con manager_id
WITH RECURSIVE employee_hierarchy AS (
-- Caso base: CEO (sin manager)
SELECT
id,
name,
manager_id,
1 as level,
name as path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Caso recursivo: empleados con managers
SELECT
e.id,
e.name,
e.manager_id,
eh.level + 1,
eh.path || ' > ' || e.name
FROM employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
level,
name,
path
FROM employee_hierarchy
ORDER BY level, name;
""")
# ===== MÚLTIPLES CTEs =====
complex_query = text("""
WITH
-- CTE 1: Ventas mensuales
monthly_sales AS (
SELECT
DATE_TRUNC('month', sale_date) as month,
SUM(amount) as total_sales
FROM sales
GROUP BY DATE_TRUNC('month', sale_date)
),
-- CTE 2: Promedio móvil
sales_with_avg AS (
SELECT
month,
total_sales,
AVG(total_sales) OVER (
ORDER BY month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3months
FROM monthly_sales
),
-- CTE 3: Growth rate
sales_with_growth AS (
SELECT
month,
total_sales,
moving_avg_3months,
(total_sales - LAG(total_sales) OVER (ORDER BY month)) /
LAG(total_sales) OVER (ORDER BY month) * 100 as growth_rate
FROM sales_with_avg
)
SELECT * FROM sales_with_growth
WHERE growth_rate IS NOT NULL
ORDER BY month DESC;
""")
results = session.execute(complex_query)
return results.fetchall()
# ===== OPTIMIZACIÓN DE QUERIES =====
def query_optimization_tips():
"""Tips para optimizar queries"""
# ❌ MAL: N+1 queries
users = session.execute(text("SELECT * FROM users")).fetchall()
for user in users:
# Para cada usuario, query separado (N queries)
orders = session.execute(
text("SELECT * FROM orders WHERE user_id = :user_id"),
{"user_id": user.id}
).fetchall()
# ✅ BIEN: JOIN (1 query)
results = session.execute(text("""
SELECT
u.*,
o.*
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
""")).fetchall()
# ❌ MAL: SELECT *
session.execute(text("SELECT * FROM users")) # Trae todas las columnas
# ✅ BIEN: SELECT específico
session.execute(text("SELECT id, email, name FROM users"))
# ❌ MAL: OR en múltiples columnas (no usa índices)
session.execute(text("""
SELECT * FROM products
WHERE category = 'electronics' OR brand = 'Apple'
"""))
# ✅ BIEN: UNION de queries indexadas
session.execute(text("""
SELECT * FROM products WHERE category = 'electronics'
UNION
SELECT * FROM products WHERE brand = 'Apple'
"""))
# ✅ MEJOR: Si es AND, usa índice compuesto
# CREATE INDEX idx_products_category_brand ON products(category, brand)
session.execute(text("""
SELECT * FROM products
WHERE category = 'electronics' AND brand = 'Apple'
"""))
Transacciones y niveles de aislamiento:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
engine = create_engine('postgresql://localhost/mydb')
Session = sessionmaker(bind=engine)
@contextmanager
def transaction_scope(isolation_level='READ COMMITTED'):
"""Context manager para transacciones"""
session = Session()
session.connection(execution_options={"isolation_level": isolation_level})
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()
# ===== ACID PROPERTIES =====
def demonstrate_transactions():
"""Demostrar propiedades ACID"""
# ATOMICITY: Todo o nada
with transaction_scope() as session:
session.execute(text("""
UPDATE accounts SET balance = balance - 100 WHERE id = 1
"""))
session.execute(text("""
UPDATE accounts SET balance = balance + 100 WHERE id = 2
"""))
# Ambas operaciones o ninguna (commit/rollback)
# ISOLATION LEVELS
# 1. READ UNCOMMITTED (dirty reads posibles)
with transaction_scope('READ UNCOMMITTED') as session:
# Puede ver cambios no commiteados de otras transacciones
pass
# 2. READ COMMITTED (default en PostgreSQL)
with transaction_scope('READ COMMITTED') as session:
# Solo ve cambios commiteados
# Pero puede ver diferentes datos en múltiples reads
pass
# 3. REPEATABLE READ
with transaction_scope('REPEATABLE READ') as session:
# Mismos datos en múltiples reads dentro de la transacción
# Phantom reads posibles (nuevas filas)
pass
# 4. SERIALIZABLE (más estricto)
with transaction_scope('SERIALIZABLE') as session:
# Transacciones ejecutan como si fueran secuenciales
# Sin phantom reads, pero puede fallar con serialization errors
pass
# ===== DEADLOCKS Y OPTIMISTIC LOCKING =====
def handle_deadlocks():
"""Manejar deadlocks con retry"""
from sqlalchemy.exc import OperationalError
import time
max_retries = 3
for attempt in range(max_retries):
try:
with transaction_scope() as session:
# Operaciones que pueden causar deadlock
session.execute(text("""
UPDATE inventory
SET quantity = quantity - 1
WHERE product_id = :id
"""), {"id": 123})
session.execute(text("""
INSERT INTO order_items (product_id, quantity)
VALUES (:id, 1)
"""), {"id": 123})
break # Éxito
except OperationalError as e:
if 'deadlock detected' in str(e).lower():
if attempt < max_retries - 1:
print(f"Deadlock detected, retrying... (attempt {attempt + 1})")
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
else:
raise
# Optimistic locking con version number
class ProductWithVersion:
"""
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
version INTEGER DEFAULT 1
)
"""
@staticmethod
def update_with_optimistic_lock(product_id, new_price, expected_version):
with transaction_scope() as session:
result = session.execute(text("""
UPDATE products
SET price = :new_price, version = version + 1
WHERE id = :id AND version = :expected_version
RETURNING version
"""), {
"new_price": new_price,
"id": product_id,
"expected_version": expected_version
})
if result.rowcount == 0:
raise Exception("Optimistic lock failed: product was modified")
return result.fetchone()[0] # Nueva versión
Receta 2.7: NoSQL - Cuándo y cómo usar
Comparación SQL vs NoSQL:
| Característica | SQL | NoSQL |
|---|---|---|
| Schema | Rígido (tablas) | Flexible (documentos, key-value) |
| Escalabilidad | Vertical | Horizontal |
| Transacciones | ACID completo | Eventual consistency (mayormente) |
| Joins | ✅ Eficientes | ❌ Limitados/evitar |
| Casos de uso | Finanzas, ERP | Redes sociales, IoT, logs |
MongoDB: Document Store
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
import json
# Conectar a MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
# ===== OPERACIONES BÁSICAS =====
def mongodb_crud():
"""CRUD en MongoDB"""
products = db['products']
# CREATE
product = {
'name': 'Laptop Pro',
'price': 1299.99,
'specs': {
'cpu': 'Intel i7',
'ram': '16GB',
'storage': '512GB SSD'
},
'tags': ['electronics', 'computers'],
'in_stock': True,
'created_at': datetime.now()
}
result = products.insert_one(product)
print(f"Inserted ID: {result.inserted_id}")
# READ
laptop = products.find_one({'name': 'Laptop Pro'})
print(f"Found: {laptop}")
# UPDATE
products.update_one(
{'name': 'Laptop Pro'},
{'$set': {'price': 1199.99, 'in_stock': False}}
)
# DELETE
products.delete_one({'name': 'Laptop Pro'})
# BULK INSERT
products.insert_many([
{'name': 'Mouse', 'price': 29.99, 'category': 'accessories'},
{'name': 'Keyboard', 'price': 79.99, 'category': 'accessories'},
])
# ===== QUERIES AVANZADOS =====
def mongodb_queries():
"""Queries complejos en MongoDB"""
products = db['products']
# Queries con operadores
expensive = products.find({'price': {'$gt': 100}}) # Mayor que
electronics = products.find({'tags': {'$in': ['electronics', 'computers']}})
# Text search (requiere índice de texto)
products.create_index([('name', 'text'), ('description', 'text')])
search_results = products.find({'$text': {'$search': 'laptop pro'}})
# Aggregation pipeline
pipeline = [
# Stage 1: Filtrar
{'$match': {'in_stock': True}},
# Stage 2: Agrupar
{'$group': {
'_id': '$category',
'total_products': {'$sum': 1},
'avg_price': {'$avg': '$price'},
'max_price': {'$max': '$price'}
}},
# Stage 3: Ordenar
{'$sort': {'avg_price': -1}},
# Stage 4: Limitar
{'$limit': 10}
]
results = list(products.aggregate(pipeline))
print(f"Aggregation results: {json.dumps(results, indent=2, default=str)}")
# Lookup (join)
orders = db['orders']
pipeline = [
{'$lookup': {
'from': 'products',
'localField': 'product_id',
'foreignField': '_id',
'as': 'product_details'
}},
{'$unwind': '$product_details'}
]
enriched_orders = list(orders.aggregate(pipeline))
# ===== MODELADO DE DATOS =====
def data_modeling_patterns():
"""Patrones de modelado en MongoDB"""
# PATRÓN 1: Embedding (desnormalización)
# Usar cuando: Relación 1-a-pocos, datos se leen juntos
user_with_address = {
'_id': 'user_123',
'name': 'Alice',
'email': 'alice@example.com',
'address': { # Embedded document
'street': '123 Main St',
'city': 'NYC',
'zip': '10001'
},
'recent_orders': [ # Embedded array
{'order_id': 'ord_1', 'total': 99.99},
{'order_id': 'ord_2', 'total': 149.99}
]
}
# PATRÓN 2: Referencing (normalización)
# Usar cuando: Relación 1-a-muchos, datos se actualizan independientemente
user = {
'_id': 'user_123',
'name': 'Alice',
'email': 'alice@example.com'
}
orders = [
{'_id': 'ord_1', 'user_id': 'user_123', 'total': 99.99},
{'_id': 'ord_2', 'user_id': 'user_123', 'total': 149.99}
]
# PATRÓN 3: Hybrid (combinación)
# Denormalizar datos críticos, referenciar el resto
blog_post = {
'_id': 'post_456',
'title': 'MongoDB Best Practices',
'author': { # Denormalizar datos básicos del autor
'id': 'user_123',
'name': 'Alice',
'avatar': 'url'
},
'comments_count': 42, # Denormalizar conteo
'comment_ids': ['cmt_1', 'cmt_2'] # Referenciar comments completos
}
# PATRÓN 4: Bucketing (para time-series data)
# Agrupar múltiples eventos en un documento
sensor_readings_bucket = {
'_id': 'sensor_123_2024-01-15_10',
'sensor_id': 'sensor_123',
'date': datetime(2024, 1, 15, 10),
'readings': [
{'time': '10:00', 'temp': 22.5, 'humidity': 45},
{'time': '10:01', 'temp': 22.6, 'humidity': 46},
# ... hasta 60 readings (1 por minuto)
]
}
# ===== ÍNDICES Y PERFORMANCE =====
def mongodb_indexes():
"""Índices en MongoDB"""
products = db['products']
# Índice simple
products.create_index([('price', ASCENDING)])
# Índice compuesto
products.create_index([
('category', ASCENDING),
('price', DESCENDING)
])
# Índice único
users = db['users']
users.create_index([('email', ASCENDING)], unique=True)
# Índice TTL (auto-delete después de N segundos)
sessions = db['sessions']
sessions.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
# Índice de texto
products.create_index([('name', 'text'), ('description', 'text')])
# Ver índices
print("Indexes:", list(products.list_indexes()))
# Explain para ver uso de índices
explain = products.find({'price': {'$gt': 100}}).explain()
print(f"Query plan: {explain['executionStats']}")
Redis: In-Memory Key-Value Store
import redis
import json
from datetime import timedelta
# Conectar a Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ===== OPERACIONES BÁSICAS =====
def redis_basics():
"""Operaciones básicas en Redis"""
# STRING operations
r.set('user:1000:name', 'Alice')
r.set('user:1000:email', 'alice@example.com')
name = r.get('user:1000:name')
print(f"Name: {name}")
# SET con expiración
r.setex('session:abc123', timedelta(hours=1), 'user_data')
# Incremento atómico
r.set('page:views', 0)
r.incr('page:views')
r.incrby('page:views', 10)
views = r.get('page:views')
# HASH operations (objects)
r.hset('user:1000', mapping={
'name': 'Alice',
'email': 'alice@example.com',
'age': '30'
})
user = r.hgetall('user:1000')
print(f"User: {user}")
r.hincrby('user:1000', 'age', 1) # Increment age
# LIST operations (queues)
r.lpush('queue:emails', 'email1@example.com') # Push left
r.rpush('queue:emails', 'email2@example.com') # Push right
email = r.lpop('queue:emails') # Pop left (FIFO)
print(f"Processing: {email}")
# SET operations (unique values)
r.sadd('tags:post:123', 'python', 'redis', 'nosql')
r.sadd('tags:post:456', 'python', 'mongodb')
# Intersección (tags comunes)
common_tags = r.sinter('tags:post:123', 'tags:post:456')
print(f"Common tags: {common_tags}")
# SORTED SET (leaderboards)
r.zadd('leaderboard', {
'Alice': 1500,
'Bob': 1200,
'Charlie': 1800
})
# Top 3
top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(f"Leaderboard: {top3}")
# Rank de un usuario
rank = r.zrevrank('leaderboard', 'Alice')
print(f"Alice's rank: {rank + 1}")
# ===== CASOS DE USO =====
class RateLimiter:
"""Rate limiter con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, user_id: str, max_requests: int = 10, window_seconds: int = 60):
"""
Sliding window rate limiter
Permite max_requests en window_seconds
"""
key = f"rate_limit:{user_id}"
current_time = int(time.time())
window_start = current_time - window_seconds
# Limpiar requests antiguos
self.redis.zremrangebyscore(key, 0, window_start)
# Contar requests en ventana
request_count = self.redis.zcard(key)
if request_count < max_requests:
# Agregar request actual
self.redis.zadd(key, {str(current_time): current_time})
self.redis.expire(key, window_seconds)
return True
return False
class DistributedLock:
"""Distributed lock con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def acquire_lock(self, lock_name: str, timeout: int = 10):
"""Adquirir lock"""
lock_key = f"lock:{lock_name}"
# SET NX (solo si no existe) con expiración
acquired = self.redis.set(
lock_key,
'locked',
nx=True,
ex=timeout
)
return acquired
def release_lock(self, lock_name: str):
"""Liberar lock"""
self.redis.delete(f"lock:{lock_name}")
def with_lock(self, lock_name: str):
"""Context manager para lock"""
from contextlib import contextmanager
@contextmanager
def _lock():
acquired = self.acquire_lock(lock_name)
if not acquired:
raise Exception(f"Could not acquire lock: {lock_name}")
try:
yield
finally:
self.release_lock(lock_name)
return _lock()
class SessionStore:
"""Session storage con Redis"""
def __init__(self, redis_client):
self.redis = redis_client
def create_session(self, user_id: str, session_data: dict, ttl: int = 3600):
"""Crear sesión"""
import uuid
session_id = str(uuid.uuid4())
self.redis.setex(
f"session:{session_id}",
ttl,
json.dumps({**session_data, 'user_id': user_id})
)
return session_id
def get_session(self, session_id: str):
"""Obtener sesión"""
data = self.redis.get(f"session:{session_id}")
return json.loads(data) if data else None
def extend_session(self, session_id: str, ttl: int = 3600):
"""Extender sesión"""
self.redis.expire(f"session:{session_id}", ttl)
def delete_session(self, session_id: str):
"""Eliminar sesión"""
self.redis.delete(f"session:{session_id}")
# Usar los componentes
import time
rate_limiter = RateLimiter(r)
for i in range(15):
allowed = rate_limiter.is_allowed('user_123', max_requests=10, window_seconds=60)
print(f"Request {i+1}: {'✓ Allowed' if allowed else '✗ Rate limited'}")
distributed_lock = DistributedLock(r)
with distributed_lock.with_lock('critical_section'):
print("Doing critical work...")
time.sleep(1)
session_store = SessionStore(r)
sid = session_store.create_session('user_456', {'theme': 'dark'})
print(f"Session created: {sid}")
session = session_store.get_session(sid)
print(f"Session data: {session}")
Cuándo usar cada tipo de NoSQL:
-
Document Store (MongoDB, CouchDB)
- ✅ Esquemas flexibles que evolucionan
- ✅ Datos jerárquicos (JSON-like)
- ✅ Content management, catálogos de productos
-
Key-Value (Redis, DynamoDB)
- ✅ Caching
- ✅ Session storage
- ✅ Real-time analytics
- ✅ Rate limiting
-
Column-Family (Cassandra, HBase)
- ✅ Time-series data
- ✅ Logs a escala masiva
- ✅ IoT sensor data
-
Graph (Neo4j, ArangoDB)
- ✅ Social networks
- ✅ Recommendation engines
- ✅ Fraud detection
- ✅ Knowledge graphs