🔒 Developer Cookbook - FASE 4: Seguridad y Observabilidad

Recetas prácticas para construir aplicaciones seguras, monitoreadas y observables


📚 Tabla de Contenidos

  1. Ciberseguridad para Desarrolladores
  2. Code Review y Análisis Estático
  3. Observabilidad y Monitoreo

Ciberseguridad para Desarrolladores

Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas

¿Qué es OWASP Top 10? Lista de las 10 vulnerabilidades de seguridad más críticas en aplicaciones web, actualizada cada 3-4 años.

OWASP Top 10 (2021):

#VulnerabilidadImpactoPrevalencia
1Broken Access Control🔴 Alto🔴 Muy común
2Cryptographic Failures🔴 Alto🟡 Común
3Injection🔴 Alto🟡 Común
4Insecure Design🟡 Medio🟢 Moderado
5Security Misconfiguration🟡 Medio🔴 Muy común
6Vulnerable Components🟡 Medio🔴 Muy común
7Identification/Auth Failures🔴 Alto🟡 Común
8Software/Data Integrity🔴 Alto🟢 Moderado
9Security Logging Failures🟡 Medio🟡 Común
10Server-Side Request Forgery🟡 Medio🟢 Moderado

Receta 4.2: Injection Attacks - SQL, NoSQL, Command

A3: Injection

SQL Injection

❌ VULNERABLE:

# Flask/Python - NUNCA hacer esto
@app.route('/user/<user_id>')
def get_user(user_id):
    # SQL Injection vulnerable
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)
    
    # Ataque: /user/1 OR 1=1
    # Query: SELECT * FROM users WHERE id = 1 OR 1=1
    # Retorna TODOS los usuarios

✅ SEGURO - Prepared Statements:

# ✅ BIEN: Usar parameterized queries
@app.route('/user/<user_id>')
def get_user(user_id):
    query = "SELECT * FROM users WHERE id = %s"
    cursor.execute(query, (user_id,))
    
    # O con ORM (SQLAlchemy)
    user = db.session.query(User).filter(User.id == user_id).first()
    return jsonify(user.to_dict())

# ✅ BIEN: Con validación adicional
@app.route('/user/<int:user_id>')  # Type hint valida que sea int
def get_user(user_id):
    if not isinstance(user_id, int) or user_id < 1:
        return jsonify({'error': 'Invalid user ID'}), 400
    
    query = "SELECT * FROM users WHERE id = %s"
    cursor.execute(query, (user_id,))

Casos comunes de SQL Injection:

# 1. Login bypass
# ❌ VULNERABLE
username = request.form['username']
password = request.form['password']
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"

# Ataque: username = admin'-- 
# Query: SELECT * FROM users WHERE username='admin'-- AND password='...'
# Comenta el resto, bypass de password!

# ✅ SEGURO
query = "SELECT * FROM users WHERE username=%s AND password=%s"
cursor.execute(query, (username, hashed_password))

# 2. UNION-based injection
# ❌ VULNERABLE
search = request.args.get('q')
query = f"SELECT title, content FROM articles WHERE title LIKE '%{search}%'"

# Ataque: q=' UNION SELECT username, password FROM users--
# Extrae credenciales!

# ✅ SEGURO
query = "SELECT title, content FROM articles WHERE title LIKE %s"
cursor.execute(query, (f'%{search}%',))

# 3. Blind SQL Injection
# ❌ VULNERABLE
product_id = request.args.get('id')
query = f"SELECT * FROM products WHERE id={product_id}"

# Ataque: id=1 AND (SELECT SLEEP(5))
# Si tarda 5 segundos, vulnerable!

# ✅ SEGURO: Prepared statements + validación
if not product_id.isdigit():
    return jsonify({'error': 'Invalid ID'}), 400

query = "SELECT * FROM products WHERE id=%s"
cursor.execute(query, (product_id,))

NoSQL Injection (MongoDB)

# ❌ VULNERABLE
@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')
    
    # NoSQL Injection vulnerable
    user = db.users.find_one({
        'username': username,
        'password': password
    })
    
    # Ataque JSON:
    # {"username": {"$ne": null}, "password": {"$ne": null}}
    # Retorna el primer usuario (admin bypass)

# ✅ SEGURO
@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')
    
    # Validar tipos
    if not isinstance(username, str) or not isinstance(password, str):
        return jsonify({'error': 'Invalid credentials'}), 400
    
    # Sanitizar input (eliminar operadores)
    username = username.replace('$', '')
    
    # Usar operadores explícitos
    user = db.users.find_one({
        'username': {'$eq': username}
    })
    
    if user and check_password_hash(user['password'], password):
        return jsonify({'token': generate_token(user['_id'])})
    
    return jsonify({'error': 'Invalid credentials'}), 401

Command Injection

# ❌ VULNERABLE
import subprocess

@app.route('/ping')
def ping():
    host = request.args.get('host')
    
    # Command injection vulnerable
    result = subprocess.run(f'ping -c 4 {host}', shell=True, capture_output=True)
    
    # Ataque: host=google.com; cat /etc/passwd
    # Ejecuta: ping -c 4 google.com; cat /etc/passwd

# ✅ SEGURO
import subprocess
import re

@app.route('/ping')
def ping():
    host = request.args.get('host')
    
    # 1. Validar formato (solo dominios/IPs válidos)
    if not re.match(r'^[a-zA-Z0-9.-]+$', host):
        return jsonify({'error': 'Invalid host'}), 400
    
    # 2. NO usar shell=True
    # 3. Pasar comando como lista
    try:
        result = subprocess.run(
            ['ping', '-c', '4', host],
            capture_output=True,
            text=True,
            timeout=10
        )
        return jsonify({'output': result.stdout})
    except subprocess.TimeoutExpired:
        return jsonify({'error': 'Timeout'}), 408

Prevención general:

class InputValidator:
    """Validador de inputs"""
    
    @staticmethod
    def sanitize_string(input_str: str, max_length: int = 255) -> str:
        """Sanitizar string general"""
        if not isinstance(input_str, str):
            raise ValueError("Input must be string")
        
        # Truncar
        sanitized = input_str[:max_length]
        
        # Eliminar caracteres peligrosos
        dangerous_chars = ['<', '>', '&', '"', "'", ';', '|', '`', '$']
        for char in dangerous_chars:
            sanitized = sanitized.replace(char, '')
        
        return sanitized.strip()
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """Validar email"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    @staticmethod
    def validate_username(username: str) -> bool:
        """Validar username (alfanumérico + guiones)"""
        import re
        return bool(re.match(r'^[a-zA-Z0-9_-]{3,20}$', username))
    
    @staticmethod
    def validate_sql_safe(value: str) -> bool:
        """Verificar que no contenga SQL keywords"""
        sql_keywords = [
            'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
            'ALTER', 'UNION', 'OR', 'AND', '--', '/*', '*/', 'xp_'
        ]
        upper_value = value.upper()
        return not any(keyword in upper_value for keyword in sql_keywords)

# Uso
validator = InputValidator()

@app.route('/register', methods=['POST'])
def register():
    try:
        username = validator.sanitize_string(request.json.get('username'))
        email = request.json.get('email')
        
        if not validator.validate_username(username):
            return jsonify({'error': 'Invalid username'}), 400
        
        if not validator.validate_email(email):
            return jsonify({'error': 'Invalid email'}), 400
        
        # Continuar con registro...
    except ValueError as e:
        return jsonify({'error': str(e)}), 400

Receta 4.3: XSS (Cross-Site Scripting)

¿Qué es XSS? Inyección de código JavaScript malicioso en páginas web vistas por otros usuarios.

Tipos de XSS:

1. Reflected XSS (No persistente)

# ❌ VULNERABLE
@app.route('/search')
def search():
    query = request.args.get('q', '')
    
    # XSS vulnerable - renderiza input sin escape
    return f"""
    <html>
        <body>
            <h1>Resultados para: {query}</h1>
        </body>
    </html>
    """
    
    # Ataque: /search?q=<script>alert('XSS')</script>
    # Se ejecuta el JavaScript!

# ✅ SEGURO
from markupsafe import escape

@app.route('/search')
def search():
    query = request.args.get('q', '')
    
    # Escapar HTML entities
    safe_query = escape(query)
    
    return f"""
    <html>
        <body>
            <h1>Resultados para: {safe_query}</h1>
        </body>
    </html>
    """
    
    # O mejor: usar template engine con auto-escape
    return render_template('search.html', query=query)
<!-- templates/search.html (Jinja2 auto-escapa por default) -->
<!DOCTYPE html>
<html>
<body>
    <h1>Resultados para: {{ query }}</h1>
    
    <!-- Si NECESITAS HTML sin escape (PELIGROSO) -->
    <!-- <div>{{ content | safe }}</div> -->
</body>
</html>

2. Stored XSS (Persistente)

# ❌ VULNERABLE
@app.route('/comment', methods=['POST'])
def add_comment():
    comment = request.json.get('comment')
    
    # Guardar sin sanitizar
    db.comments.insert_one({
        'text': comment,
        'created_at': datetime.now()
    })
    
    return jsonify({'status': 'success'})

@app.route('/comments')
def get_comments():
    comments = list(db.comments.find())
    
    # Renderizar sin escape
    html = '<div>'
    for comment in comments:
        html += f"<p>{comment['text']}</p>"
    html += '</div>'
    
    return html
    
    # Ataque: comment = "<img src=x onerror='alert(document.cookie)'>"
    # Se ejecuta en TODOS los usuarios que vean los comentarios!

# ✅ SEGURO
import bleach

@app.route('/comment', methods=['POST'])
def add_comment():
    comment = request.json.get('comment')
    
    # Sanitizar HTML permitiendo solo tags seguros
    clean_comment = bleach.clean(
        comment,
        tags=['b', 'i', 'u', 'p', 'br'],  # Tags permitidos
        attributes={},  # Sin atributos
        strip=True
    )
    
    db.comments.insert_one({
        'text': clean_comment,
        'created_at': datetime.now()
    })
    
    return jsonify({'status': 'success'})

@app.route('/comments')
def get_comments():
    comments = list(db.comments.find())
    
    # Renderizar con auto-escape
    return render_template('comments.html', comments=comments)

3. DOM-based XSS (Client-side)

<!-- ❌ VULNERABLE -->
<script>
function displayName() {
    // Lee de URL
    const name = new URLSearchParams(window.location.search).get('name');
    
    // Inserta en DOM sin sanitizar
    document.getElementById('greeting').innerHTML = 'Hello, ' + name;
    
    // Ataque: ?name=<img src=x onerror='alert(1)'>
}
</script>

<!-- ✅ SEGURO -->
<script>
function displayName() {
    const name = new URLSearchParams(window.location.search).get('name');
    
    // Usar textContent en vez de innerHTML
    document.getElementById('greeting').textContent = 'Hello, ' + name;
    
    // O sanitizar con DOMPurify
    const cleanName = DOMPurify.sanitize(name);
    document.getElementById('greeting').innerHTML = 'Hello, ' + cleanName;
}
</script>

Content Security Policy (CSP):

from flask import Flask, make_response

app = Flask(__name__)

@app.after_request
def set_csp(response):
    """Configurar Content Security Policy"""
    csp = {
        "default-src": "'self'",
        "script-src": "'self' 'unsafe-inline' https://cdn.example.com",
        "style-src": "'self' 'unsafe-inline'",
        "img-src": "'self' data: https:",
        "font-src": "'self' https://fonts.gstatic.com",
        "connect-src": "'self' https://api.example.com",
        "frame-ancestors": "'none'",
        "base-uri": "'self'",
        "form-action": "'self'"
    }
    
    csp_string = '; '.join([f"{k} {v}" for k, v in csp.items()])
    response.headers['Content-Security-Policy'] = csp_string
    
    # Otros headers de seguridad
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    
    return response

Receta 4.4: CSRF (Cross-Site Request Forgery)

¿Qué es CSRF? Atacante engaña a usuario autenticado para ejecutar acciones no deseadas.

Escenario del ataque:

<!-- Sitio malicioso: evil.com -->
<html>
<body>
    <h1>¡Ganaste un premio!</h1>
    
    <!-- Formulario oculto que ejecuta acción en bank.com -->
    <form id="evil" action="https://bank.com/transfer" method="POST">
        <input type="hidden" name="to" value="attacker">
        <input type="hidden" name="amount" value="10000">
    </form>
    
    <script>
        // Auto-submit cuando usuario visita la página
        document.getElementById('evil').submit();
    </script>
</body>
</html>

<!-- Si usuario está autenticado en bank.com, la transferencia se ejecuta! -->

❌ VULNERABLE:

@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
    # Sin protección CSRF
    to = request.form.get('to')
    amount = request.form.get('amount')
    
    # Ejecuta la transferencia
    execute_transfer(current_user, to, amount)
    
    return jsonify({'status': 'success'})

✅ SEGURO - CSRF Tokens:

from flask_wtf.csrf import CSRFProtect
import secrets

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
csrf = CSRFProtect(app)

@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
    # Flask-WTF verifica CSRF token automáticamente
    to = request.form.get('to')
    amount = request.form.get('amount')
    
    execute_transfer(current_user, to, amount)
    
    return jsonify({'status': 'success'})
<!-- Template con CSRF token -->
<form method="POST" action="/transfer">
    <!-- Token CSRF -->
    <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
    
    <input type="text" name="to" placeholder="Destinatario">
    <input type="number" name="amount" placeholder="Monto">
    <button type="submit">Transferir</button>
</form>

Para APIs (SPA con AJAX):

# Backend: Generar token y enviarlo en cookie
from flask import jsonify, make_response

@app.route('/api/csrf-token')
def get_csrf_token():
    token = secrets.token_urlsafe(32)
    
    # Guardar en sesión
    session['csrf_token'] = token
    
    response = make_response(jsonify({'csrf_token': token}))
    
    # Cookie con SameSite
    response.set_cookie(
        'csrf_token',
        token,
        httponly=False,  # JS necesita leerla
        secure=True,
        samesite='Strict'
    )
    
    return response

@app.route('/api/transfer', methods=['POST'])
@login_required
def api_transfer():
    # Validar token del header
    token_from_header = request.headers.get('X-CSRF-Token')
    token_from_session = session.get('csrf_token')
    
    if not token_from_header or token_from_header != token_from_session:
        return jsonify({'error': 'CSRF token mismatch'}), 403
    
    # Procesar transferencia
    data = request.json
    execute_transfer(current_user, data['to'], data['amount'])
    
    return jsonify({'status': 'success'})
// Frontend: Incluir token en requests
async function transfer(to, amount) {
    // Obtener token
    const tokenResponse = await fetch('/api/csrf-token');
    const { csrf_token } = await tokenResponse.json();
    
    // Incluir en header
    const response = await fetch('/api/transfer', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'X-CSRF-Token': csrf_token
        },
        body: JSON.stringify({ to, amount })
    });
    
    return response.json();
}

SameSite Cookie Attribute:

# Protección adicional contra CSRF
@app.after_request
def set_cookie_samesite(response):
    """Configurar SameSite en cookies"""
    for cookie in response.headers.getlist('Set-Cookie'):
        if 'SameSite' not in cookie:
            response.headers.add('Set-Cookie', cookie + '; SameSite=Lax')
    
    return response

# Opciones SameSite:
# - Strict: Cookie nunca se envía en requests cross-site (más seguro)
# - Lax: Cookie se envía en navegación top-level (balance)
# - None: Cookie se envía siempre (requiere Secure flag)

response.set_cookie(
    'session',
    session_id,
    httponly=True,
    secure=True,
    samesite='Strict'
)

Receta 4.5: Authentication & Authorization

A7: Identification and Authentication Failures

Autenticación Segura

from werkzeug.security import generate_password_hash, check_password_hash
import secrets
from datetime import datetime, timedelta
import jwt

class AuthManager:
    """Gestión de autenticación segura"""
    
    SECRET_KEY = 'your-secret-key'  # En producción: variable de entorno
    
    @staticmethod
    def hash_password(password: str) -> str:
        """Hash password con salt"""
        # Usar bcrypt o argon2 en producción
        return generate_password_hash(password, method='pbkdf2:sha256', salt_length=16)
    
    @staticmethod
    def verify_password(password: str, password_hash: str) -> bool:
        """Verificar password"""
        return check_password_hash(password_hash, password)
    
    @staticmethod
    def validate_password_strength(password: str) -> tuple[bool, str]:
        """Validar fortaleza del password"""
        if len(password) < 12:
            return False, "Password must be at least 12 characters"
        
        if not any(c.isupper() for c in password):
            return False, "Password must contain uppercase letter"
        
        if not any(c.islower() for c in password):
            return False, "Password must contain lowercase letter"
        
        if not any(c.isdigit() for c in password):
            return False, "Password must contain digit"
        
        if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in password):
            return False, "Password must contain special character"
        
        # Verificar contra contraseñas comunes
        common_passwords = ['password123', 'qwerty123', '123456789']
        if password.lower() in common_passwords:
            return False, "Password is too common"
        
        return True, "Password is strong"
    
    @staticmethod
    def generate_jwt(user_id: str, expires_in: int = 3600) -> str:
        """Generar JWT token"""
        payload = {
            'user_id': user_id,
            'exp': datetime.utcnow() + timedelta(seconds=expires_in),
            'iat': datetime.utcnow()
        }
        
        return jwt.encode(payload, AuthManager.SECRET_KEY, algorithm='HS256')
    
    @staticmethod
    def verify_jwt(token: str) -> dict:
        """Verificar JWT token"""
        try:
            payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=['HS256'])
            return {'valid': True, 'user_id': payload['user_id']}
        except jwt.ExpiredSignatureError:
            return {'valid': False, 'error': 'Token expired'}
        except jwt.InvalidTokenError:
            return {'valid': False, 'error': 'Invalid token'}

# Uso
@app.route('/register', methods=['POST'])
def register():
    data = request.json
    password = data.get('password')
    
    # Validar fortaleza
    is_valid, message = AuthManager.validate_password_strength(password)
    if not is_valid:
        return jsonify({'error': message}), 400
    
    # Hash password
    password_hash = AuthManager.hash_password(password)
    
    # Crear usuario
    user = db.users.insert_one({
        'email': data['email'],
        'password': password_hash,
        'created_at': datetime.utcnow()
    })
    
    return jsonify({'status': 'success', 'user_id': str(user.inserted_id)})

@app.route('/login', methods=['POST'])
def login():
    data = request.json
    
    # Rate limiting (prevenir brute force)
    if is_rate_limited(request.remote_addr):
        return jsonify({'error': 'Too many attempts'}), 429
    
    # Buscar usuario
    user = db.users.find_one({'email': data['email']})
    
    if not user or not AuthManager.verify_password(data['password'], user['password']):
        # Registrar intento fallido
        log_failed_login(request.remote_addr, data['email'])
        
        # Mensaje genérico (no revelar si usuario existe)
        return jsonify({'error': 'Invalid credentials'}), 401
    
    # Generar token
    token = AuthManager.generate_jwt(str(user['_id']))
    
    return jsonify({'token': token})

Multi-Factor Authentication (MFA)

import pyotp
import qrcode
from io import BytesIO
import base64

class MFAManager:
    """Gestión de MFA (TOTP)"""
    
    @staticmethod
    def generate_secret() -> str:
        """Generar secret para TOTP"""
        return pyotp.random_base32()
    
    @staticmethod
    def generate_qr_code(secret: str, user_email: str, issuer: str = 'MyApp') -> str:
        """Generar QR code para Google Authenticator"""
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user_email,
            issuer_name=issuer
        )
        
        # Generar QR
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Convertir a base64
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    @staticmethod
    def verify_totp(secret: str, code: str) -> bool:
        """Verificar código TOTP"""
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=1)  # Acepta 1 código antes/después

# Endpoints
@app.route('/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
    """Configurar MFA para usuario"""
    user_id = current_user['_id']
    
    # Generar secret
    secret = MFAManager.generate_secret()
    
    # Guardar secret (encriptado en producción)
    db.users.update_one(
        {'_id': user_id},
        {'$set': {'mfa_secret': secret, 'mfa_enabled': False}}
    )
    
    # Generar QR
    qr_code = MFAManager.generate_qr_code(secret, current_user['email'])
    
    return jsonify({
        'qr_code': qr_code,
        'secret': secret  # Para manual entry
    })

@app.route('/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
    """Verificar y activar MFA"""
    code = request.json.get('code')
    user = db.users.find_one({'_id': current_user['_id']})
    
    if MFAManager.verify_totp(user['mfa_secret'], code):
        # Activar MFA
        db.users.update_one(
            {'_id': current_user['_id']},
            {'$set': {'mfa_enabled': True}}
        )
        
        # Generar backup codes
        backup_codes = [secrets.token_hex(8) for _ in range(10)]
        db.users.update_one(
            {'_id': current_user['_id']},
            {'$set': {'backup_codes': [hash_code(c) for c in backup_codes]}}
        )
        
        return jsonify({
            'status': 'success',
            'backup_codes': backup_codes  # Mostrar UNA VEZ
        })
    
    return jsonify({'error': 'Invalid code'}), 400

@app.route('/login/mfa', methods=['POST'])
def login_with_mfa():
    """Login con MFA"""
    # Paso 1: Verificar email/password (devolver temp token)
    # Paso 2: Verificar código MFA con temp token
    
    data = request.json
    temp_token = data.get('temp_token')
    mfa_code = data.get('mfa_code')
    
    # Verificar temp token
    payload = verify_temp_token(temp_token)
    if not payload:
        return jsonify({'error': 'Invalid token'}), 401
    
    user = db.users.find_one({'_id': payload['user_id']})
    
    # Verificar código MFA
    if MFAManager.verify_totp(user['mfa_secret'], mfa_code):
        # Generar token final
        token = AuthManager.generate_jwt(str(user['_id']))
        return jsonify({'token': token})
    
    # O verificar backup code
    if verify_backup_code(user, mfa_code):
        token = AuthManager.generate_jwt(str(user['_id']))
        return jsonify({'token': token})
    
    return jsonify({'error': 'Invalid MFA code'}), 401

Role-Based Access Control (RBAC)

from functools import wraps
from flask import g

class Role:
    ADMIN = 'admin'
    MODERATOR = 'moderator'
    USER = 'user'
    GUEST = 'guest'

class Permission:
    READ = 'read'
    WRITE = 'write'
    DELETE = 'delete'
    ADMIN = 'admin'

# Matriz de permisos por rol
ROLE_PERMISSIONS = {
    Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
    Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
    Role.USER: [Permission.READ, Permission.WRITE],
    Role.GUEST: [Permission.READ]
}

def require_permission(permission: str):
    """Decorator para requerir permiso"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not g.user:
                return jsonify({'error': 'Not authenticated'}), 401
            
            user_role = g.user.get('role', Role.GUEST)
            user_permissions = ROLE_PERMISSIONS.get(user_role, [])
            
            if permission not in user_permissions:
                return jsonify({'error': 'Insufficient permissions'}), 403
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

def require_role(role: str):
    """Decorator para requerir rol específico"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not g.user:
                return jsonify({'error': 'Not authenticated'}), 401
            
            if g.user.get('role') != role:
                return jsonify({'error': 'Insufficient permissions'}), 403
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Uso
@app.route('/admin/users')
@require_role(Role.ADMIN)
def list_users():
    """Solo admins pueden listar usuarios"""
    users = list(db.users.find({}, {'password': 0}))
    return jsonify(users)

@app.route('/posts/<post_id>', methods=['DELETE'])
@require_permission(Permission.DELETE)
def delete_post(post_id):
    """Requiere permiso de DELETE (admins y moderadores)"""
    post = db.posts.find_one({'_id': post_id})
    
    # Verificar ownership (usuarios pueden borrar sus propios posts)
    if g.user['role'] == Role.USER and post['author_id'] != g.user['_id']:
        return jsonify({'error': 'Can only delete own posts'}), 403
    
    db.posts.delete_one({'_id': post_id})
    return jsonify({'status': 'success'})

Receta 4.6: Encriptación y Gestión de Secretos

Encriptación en tránsito vs en reposo:

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os

class EncryptionManager:
    """Gestión de encriptación"""
    
    @staticmethod
    def generate_key() -> bytes:
        """Generar key para Fernet"""
        return Fernet.generate_key()
    
    @staticmethod
    def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
        """Derivar key desde password usando PBKDF2"""
        if salt is None:
            salt = os.urandom(16)
        
        kdf = PBKDF2(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000
        )
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return key, salt
    
    @staticmethod
    def encrypt(data: str, key: bytes) -> str:
        """Encriptar data"""
        f = Fernet(key)
        encrypted = f.encrypt(data.encode())
        return encrypted.decode()
    
    @staticmethod
    def decrypt(encrypted_data: str, key: bytes) -> str:
        """Desencriptar data"""
        f = Fernet(key)
        decrypted = f.decrypt(encrypted_data.encode())
        return decrypted.decode()

# Ejemplo: Encriptar datos sensibles en DB
class UserSecrets:
    """Gestión de datos sensibles de usuario"""
    
    def __init__(self, encryption_key: bytes):
        self.encryption_manager = EncryptionManager()
        self.key = encryption_key
    
    def store_credit_card(self, user_id: str, card_number: str, cvv: str):
        """Guardar tarjeta encriptada"""
        # Encriptar
        encrypted_card = self.encryption_manager.encrypt(card_number, self.key)
        encrypted_cvv = self.encryption_manager.encrypt(cvv, self.key)
        
        # Guardar (solo últimos 4 dígitos en plain text para UI)
        db.user_payment_methods.insert_one({
            'user_id': user_id,
            'card_number_encrypted': encrypted_card,
            'cvv_encrypted': encrypted_cvv,
            'last_four': card_number[-4:],
            'created_at': datetime.utcnow()
        })
    
    def get_credit_card(self, user_id: str) -> dict:
        """Obtener tarjeta desencriptada"""
        card = db.user_payment_methods.find_one({'user_id': user_id})
        
        if not card:
            return None
        
        # Desencriptar
        card_number = self.encryption_manager.decrypt(
            card['card_number_encrypted'],
            self.key
        )
        
        return {
            'card_number': card_number,
            'last_four': card['last_four']
        }

# HTTPS/TLS Configuration (Nginx)
"""
# nginx.conf
server {
    listen 443 ssl http2;
    server_name myapp.com;
    
    # Certificados SSL (Let's Encrypt)
    ssl_certificate /etc/letsencrypt/live/myapp.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/myapp.com/privkey.pem;
    
    # SSL Configuration (Mozilla Intermediate)
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384';
    ssl_prefer_server_ciphers off;
    
    # HSTS
    add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
    
    # OCSP Stapling
    ssl_stapling on;
    ssl_stapling_verify on;
    
    location / {
        proxy_pass http://app:3000;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

# Redirect HTTP to HTTPS
server {
    listen 80;
    server_name myapp.com;
    return 301 https://$host$request_uri;
}
"""

Gestión de Secretos:

# ❌ NUNCA hacer esto
API_KEY = "sk_live_abc123"  # Hardcoded en código
DATABASE_PASSWORD = "mypassword123"

# ✅ BIEN: Variables de entorno
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv('API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')

# ✅ MEJOR: Gestores de secretos (AWS Secrets Manager, HashiCorp Vault)
import boto3
import json

class SecretsManager:
    """Wrapper para AWS Secrets Manager"""
    
    def __init__(self, region_name='us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region_name)
        self._cache = {}
    
    def get_secret(self, secret_name: str) -> dict:
        """Obtener secreto (con cache)"""
        if secret_name in self._cache:
            return self._cache[secret_name]
        
        try:
            response = self.client.get_secret_value(SecretId=secret_name)
            secret = json.loads(response['SecretString'])
            self._cache[secret_name] = secret
            return secret
        except Exception as e:
            print(f"Error retrieving secret: {e}")
            raise
    
    def rotate_secret(self, secret_name: str, new_value: dict):
        """Rotar secreto"""
        self.client.update_secret(
            SecretId=secret_name,
            SecretString=json.dumps(new_value)
        )
        
        # Invalidar cache
        self._cache.pop(secret_name, None)

# Uso
secrets = SecretsManager()

# Obtener credenciales de DB
db_creds = secrets.get_secret('myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/{db_creds['database']}"

# API keys
api_keys = secrets.get_secret('myapp/api-keys')
STRIPE_KEY = api_keys['stripe']
SENDGRID_KEY = api_keys['sendgrid']

Terraform - Gestión de secretos:

# secrets.tf

# Generar password random
resource "random_password" "db_password" {
  length  = 32
  special = true
}

# Guardar en AWS Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
  name = "myapp/database"
  
  recovery_window_in_days = 7
}

resource "aws_secretsmanager_secret_version" "db_credentials" {
  secret_id = aws_secretsmanager_secret.db_credentials.id
  
  secret_string = jsonencode({
    username = "postgres"
    password = random_password.db_password.result
    host     = aws_db_instance.main.endpoint
    database = "myapp"
  })
}

# IAM policy para acceso
resource "aws_iam_role_policy" "secrets_access" {
  role = aws_iam_role.ecs_task.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Resource = [
          aws_secretsmanager_secret.db_credentials.arn
        ]
      }
    ]
  })
}

# ECS Task con secretos
resource "aws_ecs_task_definition" "api" {
  family = "myapp-api"
  
  container_definitions = jsonencode([
    {
      name  = "api"
      image = "myapp:latest"
      
      # Secretos desde Secrets Manager
      secrets = [
        {
          name      = "DATABASE_URL"
          valueFrom = aws_secretsmanager_secret.db_credentials.arn
        }
      ]
    }
  ])
}

Code Review y Análisis Estático

Receta 4.7: Static Analysis - SonarQube y CodeQL

SonarQube - Configuración:

# sonar-project.properties
sonar.projectKey=myapp
sonar.projectName=My Application
sonar.projectVersion=1.0

# Paths
sonar.sources=src
sonar.tests=tests
sonar.python.coverage.reportPaths=coverage.xml

# Exclusions
sonar.exclusions=**/node_modules/**,**/vendor/**,**/*.test.js

# Coverage exclusions
sonar.coverage.exclusions=**/*.test.js,**/tests/**

# Quality Gate
sonar.qualitygate.wait=true

GitHub Actions con SonarQube:

# .github/workflows/sonarqube.yml
name: SonarQube Analysis

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  sonarqube:
    name: SonarQube Scan
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0  # Shallow clones deshabilitados para mejor análisis
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
      
      - name: Run tests with coverage
        run: pytest --cov=src --cov-report=xml
      
      - name: SonarQube Scan
        uses: sonarsource/sonarqube-scan-action@master
        env:
          SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
          SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
      
      - name: Check Quality Gate
        uses: sonarsource/sonarqube-quality-gate-action@master
        timeout-minutes: 5
        env:
          SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

CodeQL - Análisis de seguridad:

# .github/workflows/codeql.yml
name: CodeQL Analysis

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 0 * * 1'  # Weekly scan

jobs:
  analyze:
    name: Analyze
    runs-on: ubuntu-latest
    
    permissions:
      actions: read
      contents: read
      security-events: write
    
    strategy:
      fail-fast: false
      matrix:
        language: ['python', 'javascript']
    
    steps:
      - name: Checkout
        uses: actions/checkout@v4
      
      - name: Initialize CodeQL
        uses: github/codeql-action/init@v2
        with:
          languages: ${{ matrix.language }}
          queries: security-extended
      
      - name: Autobuild
        uses: github/codeql-action/autobuild@v2
      
      - name: Perform CodeQL Analysis
        uses: github/codeql-action/analyze@v2
        with:
          category: "/language:${{ matrix.language }}"

Custom CodeQL Queries:

/**
 * @name Hardcoded credentials
 * @description Detects hardcoded passwords and API keys
 * @kind problem
 * @problem.severity error
 * @id python/hardcoded-credentials
 */

import python

from StrConst str
where
  (
    str.getText().regexpMatch("(?i).*(password|passwd|pwd)\\s*=\\s*['\"][^'\"]+['\"].*") or
    str.getText().regexpMatch("(?i).*(api_key|apikey|secret|token)\\s*=\\s*['\"][^'\"]+['\"].*")
  )
  and not str.getText().regexpMatch(".*=\\s*['\"]\\$\\{.*") // Excluir variables de entorno
select str, "Potential hardcoded credential found"

Receta 4.8: Dependency Scanning

Dependabot Configuration:

# .github/dependabot.yml
version: 2
updates:
  # Python dependencies
  - package-ecosystem: "pip"
    directory: "/"
    schedule:
      interval: "weekly"
    open-pull-requests-limit: 10
    reviewers:
      - "security-team"
    labels:
      - "dependencies"
      - "security"
    
    # Ignorar patches menores de dev dependencies
    ignore:
      - dependency-name: "*"
        update-types: ["version-update:semver-patch"]
  
  # NPM dependencies
  - package-ecosystem: "npm"
    directory: "/frontend"
    schedule:
      interval: "weekly"
    versioning-strategy: increase
  
  # Docker
  - package-ecosystem: "docker"
    directory: "/"
    schedule:
      interval: "weekly"
  
  # GitHub Actions
  - package-ecosystem: "github-actions"
    directory: "/"
    schedule:
      interval: "weekly"

Snyk - Vulnerability Scanning:

# .github/workflows/snyk.yml
name: Snyk Security

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  security:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Run Snyk to check for vulnerabilities
        uses: snyk/actions/python@master
        env:
          SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
        with:
          args: --severity-threshold=high
      
      - name: Upload result to GitHub Code Scanning
        uses: github/codeql-action/upload-sarif@v2
        with:
          sarif_file: snyk.sarif

OWASP Dependency-Check:

# Instalar
wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.0.0/dependency-check-8.0.0-release.zip
unzip dependency-check-8.0.0-release.zip

# Ejecutar scan
./dependency-check/bin/dependency-check.sh \
  --project "MyApp" \
  --scan ./src \
  --format HTML \
  --format JSON \
  --out ./reports

# Con npm audit
npm audit --json > npm-audit.json

# Fix automático
npm audit fix
npm audit fix --force  # Acepta breaking changes

Safety (Python):

# Instalar
pip install safety

# Scan
safety check

# Con requirements.txt
safety check -r requirements.txt

# Output JSON
safety check --json > safety-report.json

# Ignorar vulnerabilidades específicas
safety check --ignore 12345

Observabilidad y Monitoreo

Receta 4.9: Los 3 Pilares - Logs, Métricas, Traces

1. LOGS - Structured Logging:

import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    """Formatter personalizado para logs estructurados"""
    
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        
        # Agregar campos custom
        log_record['timestamp'] = datetime.utcnow().isoformat()
        log_record['level'] = record.levelname
        log_record['logger'] = record.name
        
        # Agregar contexto si existe
        if hasattr(record, 'user_id'):
            log_record['user_id'] = record.user_id
        if hasattr(record, 'request_id'):
            log_record['request_id'] = record.request_id

# Configurar logger
def setup_logging():
    """Configurar logging estructurado"""
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Handler para stdout (Docker/K8s lo captura)
    handler = logging.StreamHandler()
    
    # Formato JSON
    formatter = CustomJsonFormatter(
        '%(timestamp)s %(level)s %(name)s %(message)s'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

logger = setup_logging()

# Uso con contexto
class RequestContext:
    """Contexto de request para logging"""
    
    def __init__(self):
        self.request_id = None
        self.user_id = None
    
    def set_request_id(self, request_id):
        self.request_id = request_id
    
    def set_user_id(self, user_id):
        self.user_id = user_id

context = RequestContext()

@app.before_request
def before_request():
    """Inicializar contexto de request"""
    import uuid
    context.set_request_id(str(uuid.uuid4()))
    
    # Extraer user_id de JWT
    token = request.headers.get('Authorization')
    if token:
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
            context.set_user_id(payload['user_id'])
        except:
            pass

# Logging con contexto
@app.route('/api/orders', methods=['POST'])
def create_order():
    logger.info(
        'Creating order',
        extra={
            'request_id': context.request_id,
            'user_id': context.user_id,
            'endpoint': '/api/orders',
            'method': 'POST'
        }
    )
    
    try:
        order = process_order(request.json)
        
        logger.info(
            'Order created successfully',
            extra={
                'request_id': context.request_id,
                'user_id': context.user_id,
                'order_id': order['id'],
                'total': order['total']
            }
        )
        
        return jsonify(order), 201
    
    except Exception as e:
        logger.error(
            'Failed to create order',
            extra={
                'request_id': context.request_id,
                'user_id': context.user_id,
                'error': str(e),
                'error_type': type(e).__name__
            },
            exc_info=True  # Include stack trace
        )
        
        return jsonify({'error': 'Internal error'}), 500

# Log levels apropiados
logger.debug('Detailed info for debugging')  # Solo en dev
logger.info('Normal operation')  # Eventos importantes
logger.warning('Something unexpected')  # Potencial problema
logger.error('Error occurred')  # Error que afecta funcionalidad
logger.critical('System unstable')  # Requiere atención inmediata

2. MÉTRICAS - Prometheus:

from prometheus_client import Counter, Histogram, Gauge, Summary, generate_latest
from prometheus_client import REGISTRY
from flask import Response
import time

# Definir métricas
http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration_seconds = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

active_users = Gauge(
    'active_users',
    'Number of active users'
)

order_value = Summary(
    'order_value_dollars',
    'Order value in dollars'
)

# Middleware para instrumentar requests
@app.before_request
def before_request_metrics():
    request.start_time = time.time()

@app.after_request
def after_request_metrics(response):
    # Incrementar contador
    http_requests_total.labels(
        method=request.method,
        endpoint=request.endpoint or 'unknown',
        status=response.status_code
    ).inc()
    
    # Registrar duración
    if hasattr(request, 'start_time'):
        duration = time.time() - request.start_time
        http_request_duration_seconds.labels(
            method=request.method,
            endpoint=request.endpoint or 'unknown'
        ).observe(duration)
    
    return response

# Endpoint de métricas
@app.route('/metrics')
def metrics():
    """Endpoint para Prometheus scraping"""
    return Response(generate_latest(REGISTRY), mimetype='text/plain')

# Métricas de negocio
@app.route('/api/orders', methods=['POST'])
def create_order():
    order = process_order(request.json)
    
    # Registrar valor de orden
    order_value.observe(order['total'])
    
    return jsonify(order)

# Métricas custom
class BusinessMetrics:
    """Métricas de negocio"""
    
    def __init__(self):
        self.revenue = Counter(
            'revenue_total_dollars',
            'Total revenue',
            ['product_category']
        )
        
        self.cart_size = Histogram(
            'cart_size_items',
            'Items per cart',
            buckets=[1, 5, 10, 20, 50]
        )
        
        self.checkout_duration = Histogram(
            'checkout_duration_seconds',
            'Time to complete checkout',
            buckets=[1, 5, 10, 30, 60, 120]
        )
    
    def record_sale(self, category: str, amount: float):
        """Registrar venta"""
        self.revenue.labels(product_category=category).inc(amount)
    
    def record_cart(self, items: int):
        """Registrar tamaño de carrito"""
        self.cart_size.observe(items)
    
    def record_checkout(self, duration: float):
        """Registrar tiempo de checkout"""
        self.checkout_duration.observe(duration)

business_metrics = BusinessMetrics()

Prometheus Configuration:

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  # API metrics
  - job_name: 'myapp-api'
    static_configs:
      - targets: ['api:3000']
    metrics_path: '/metrics'
    
  # Kubernetes pods
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__

# Alert rules
rule_files:
  - 'alerts.yml'

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

Alert Rules:

# alerts.yml
groups:
  - name: api_alerts
    interval: 30s
    rules:
      # Alta tasa de errores
      - alert: HighErrorRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
          ) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"
      
      # Latencia alta
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95,
            sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
          ) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "P95 latency is {{ $value }}s"
      
      # Pocos recursos
      - alert: HighMemoryUsage
        expr: |
          (
            node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes
          ) / node_memory_MemTotal_bytes > 0.9
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage"
          description: "Memory usage is {{ $value | humanizePercentage }}"

3. TRACES - OpenTelemetry:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

# Configurar tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Configurar exporter (Jaeger)
jaeger_exporter = JaegerExporter(
    agent_host_name='jaeger',
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Auto-instrumentar Flask
FlaskInstrumentor().instrument_app(app)

# Auto-instrumentar requests HTTP
RequestsInstrumentor().instrument()

# Auto-instrumentar SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=db.engine)

# Traces manuales
@app.route('/api/orders/<order_id>')
def get_order(order_id):
    with tracer.start_as_current_span('get_order') as span:
        # Agregar atributos al span
        span.set_attribute('order.id', order_id)
        span.set_attribute('user.id', g.user['id'])
        
        # Buscar orden
        with tracer.start_as_current_span('database.query'):
            order = db.orders.find_one({'_id': order_id})
        
        if not order:
            span.set_attribute('order.found', False)
            return jsonify({'error': 'Not found'}), 404
        
        span.set_attribute('order.found', True)
        span.set_attribute('order.total', order['total'])
        
        # Enrichir con datos de usuario
        with tracer.start_as_current_span('enrich_order_data'):
            enriched_order = enrich_order(order)
        
        return jsonify(enriched_order)

def enrich_order(order):
    """Enrichir orden con datos adicionales"""
    with tracer.start_as_current_span('fetch_user_data') as span:
        span.set_attribute('user.id', order['user_id'])
        user = fetch_user(order['user_id'])
    
    with tracer.start_as_current_span('fetch_products'):
        products = fetch_products([item['product_id'] for item in order['items']])
    
    return {
        **order,
        'user': user,
        'products': products
    }

# Distributed tracing context propagation
import requests

def call_external_service(data):
    """Llamar servicio externo con trace context"""
    with tracer.start_as_current_span('external_api_call') as span:
        # El context se propaga automáticamente en headers
        response = requests.post(
            'https://external-service.com/api',
            json=data
        )
        
        span.set_attribute('http.status_code', response.status_code)
        
        return response.json()

Receta 4.10: Grafana Dashboards

Dashboard Configuration:

{
  "dashboard": {
    "title": "API Performance Dashboard",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "sum(rate(http_requests_total[5m])) by (status)"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m]))"
          }
        ],
        "type": "stat",
        "fieldConfig": {
          "defaults": {
            "unit": "percentunit",
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"value": 0, "color": "green"},
                {"value": 0.01, "color": "yellow"},
                {"value": 0.05, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "title": "Latency (P95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint))"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Active Users",
        "targets": [
          {
            "expr": "active_users"
          }
        ],
        "type": "stat"
      }
    ]
  }
}

Loki - Log Aggregation:

# promtail-config.yml (Log shipper)
server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  # Logs de contenedores Docker
  - job_name: docker
    static_configs:
      - targets:
          - localhost
        labels:
          job: docker
          __path__: /var/lib/docker/containers/*/*.log
    
    pipeline_stages:
      # Parsear JSON
      - json:
          expressions:
            level: level
            message: message
            timestamp: timestamp
      
      # Extraer labels
      - labels:
          level:
          
      # Formatear timestamp
      - timestamp:
          source: timestamp
          format: RFC3339

LogQL Queries (Loki Query Language):

# Todos los logs de API
{job="api"}

# Logs de error
{job="api"} |= "ERROR"

# Logs de usuario específico
{job="api"} | json | user_id="12345"

# Conteo de errores por endpoint
sum by (endpoint) (
  count_over_time({job="api"} |= "ERROR" [5m])
)

# Latencia promedio
avg_over_time({job="api"} | json | unwrap duration [5m])

# Top 10 endpoints más lentos
topk(10,
  avg_over_time({job="api"} | json | unwrap duration [5m]) by (endpoint)
)

Receta 4.11: SLIs, SLOs, SLAs y Error Budgets

Definiciones:

Ejemplo de SLIs:

from dataclasses import dataclass
from typing import List
from datetime import datetime, timedelta

@dataclass
class SLI:
    """Service Level Indicator"""
    name: str
    description: str
    query: str  # Prometheus query
    good_threshold: float  # Lo que se considera "bueno"

# Definir SLIs
SLIS = [
    SLI(
        name='availability',
        description='Percentage of successful requests',
        query='''
            sum(rate(http_requests_total{status!~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
        ''',
        good_threshold=0.99  # 99% de requests exitosos
    ),
    SLI(
        name='latency',
        description='95th percentile latency',
        query='''
            histogram_quantile(0.95,
                sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
            )
        ''',
        good_threshold=0.5  # P95 < 500ms
    ),
    SLI(
        name='error_rate',
        description='Percentage of error responses',
        query='''
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
        ''',
        good_threshold=0.01  # < 1% errores
    )
]

@dataclass
class SLO:
    """Service Level Objective"""
    sli: SLI
    target: float  # Target (ej: 0.999 para 99.9%)
    window: timedelta  # Ventana de medición (ej: 30 días)

# Definir SLOs
SLOS = [
    SLO(
        sli=SLIS[0],  # availability
        target=0.999,  # 99.9%
        window=timedelta(days=30)
    ),
    SLO(
        sli=SLIS[1],  # latency
        target=0.95,  # 95% de requests < 500ms
        window=timedelta(days=30)
    )
]

class ErrorBudget:
    """Cálculo de error budget"""
    
    def __init__(self, slo: SLO):
        self.slo = slo
    
    def calculate_budget(self, total_requests: int) -> dict:
        """
        Calcular error budget
        
        Error Budget = (1 - SLO) × Total Requests
        """
        error_budget = (1 - self.slo.target) * total_requests
        
        return {
            'slo_target': self.slo.target,
            'total_requests': total_requests,
            'allowed_errors': int(error_budget),
            'allowed_downtime_minutes': self._calculate_downtime()
        }
    
    def _calculate_downtime(self) -> float:
        """Calcular downtime permitido"""
        window_minutes = self.slo.window.total_seconds() / 60
        downtime = window_minutes * (1 - self.slo.target)
        return downtime
    
    def is_budget_exhausted(self, actual_errors: int, total_requests: int) -> bool:
        """Verificar si se agotó el error budget"""
        budget = self.calculate_budget(total_requests)
        return actual_errors >= budget['allowed_errors']

# Uso
availability_slo = SLOS[0]
error_budget = ErrorBudget(availability_slo)

# En 30 días con 10M requests
budget = error_budget.calculate_budget(10_000_000)
print(f"Allowed errors: {budget['allowed_errors']}")  # 10,000
print(f"Allowed downtime: {budget['allowed_downtime_minutes']:.2f} minutes")  # 43.2 min

# Verificar si se agotó
actual_errors = 5000
is_exhausted = error_budget.is_budget_exhausted(actual_errors, 10_000_000)
print(f"Budget exhausted: {is_exhausted}")  # False

Alerting basado en Error Budget:

# error-budget-alerts.yml
groups:
  - name: error_budget
    rules:
      # Error budget 50% consumido
      - alert: ErrorBudget50PercentConsumed
        expr: |
          (
            1 - (
              sum(rate(http_requests_total{status!~"5.."}[30d]))
              /
              sum(rate(http_requests_total[30d]))
            )
          ) / (1 - 0.999) > 0.5
        labels:
          severity: warning
        annotations:
          summary: "50% of error budget consumed"
          description: "We've used half our error budget for this month"
      
      # Error budget 90% consumido
      - alert: ErrorBudget90PercentConsumed
        expr: |
          (
            1 - (
              sum(rate(http_requests_total{status!~"5.."}[30d]))
              /
              sum(rate(http_requests_total[30d]))
            )
          ) / (1 - 0.999) > 0.9
        labels:
          severity: critical
        annotations:
          summary: "90% of error budget consumed"
          description: "URGENT: Error budget almost exhausted. Freeze deployments."
      
      # Burn rate alto (consumiendo budget rápidamente)
      - alert: HighErrorBudgetBurnRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[1h]))
            /
            sum(rate(http_requests_total[1h]))
          ) > (14.4 * (1 - 0.999))
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error budget burn rate"
          description: "At this rate, we'll exhaust error budget in 2 days"

Dashboard de SLOs:

# Generar reporte de SLOs
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta

class SLOReporter:
    """Generador de reportes de SLOs"""
    
    def __init__(self, prometheus_url: str):
        self.prom = PrometheusConnect(url=prometheus_url)
    
    def generate_report(self, slo: SLO) -> dict:
        """Generar reporte de SLO"""
        end_time = datetime.now()
        start_time = end_time - slo.window
        
        # Ejecutar query
        result = self.prom.custom_query_range(
            query=slo.sli.query,
            start_time=start_time,
            end_time=end_time,
            step='1h'
        )
        
        # Calcular cumplimiento
        values = [float(point[1]) for point in result[0]['values']]
        avg_value = sum(values) / len(values)
        
        # Comparar con target
        is_meeting_slo = avg_value >= slo.target
        
        # Calcular error budget
        total_requests = self._get_total_requests(start_time, end_time)
        error_budget = ErrorBudget(slo)
        budget_info = error_budget.calculate_budget(total_requests)
        
        return {
            'slo_name': slo.sli.name,
            'target': slo.target,
            'actual': avg_value,
            'is_meeting_slo': is_meeting_slo,
            'margin': avg_value - slo.target,
            'error_budget': budget_info,
            'period': {
                'start': start_time.isoformat(),
                'end': end_time.isoformat()
            }
        }
    
    def _get_total_requests(self, start_time, end_time) -> int:
        """Obtener total de requests en período"""
        query = 'sum(increase(http_requests_total[30d]))'
        result = self.prom.custom_query(query)
        return int(float(result[0]['value'][1]))

# Uso
reporter = SLOReporter('http://prometheus:9090')

for slo in SLOS:
    report = reporter.generate_report(slo)
    
    print(f"\n=== {report['slo_name'].upper()} ===")
    print(f"Target: {report['target']*100}%")
    print(f"Actual: {report['actual']*100:.2f}%")
    print(f"Status: {'✅ MEETING' if report['is_meeting_slo'] else '❌ FAILING'}")
    print(f"Margin: {report['margin']*100:+.2f}%")
    print(f"Error Budget:")
    print(f"  Allowed errors: {report['error_budget']['allowed_errors']}")
    print(f"  Allowed downtime: {report['error_budget']['allowed_downtime_minutes']:.1f} min")

Receta 4.12: Incident Response

On-Call Runbook:

# Incident Response Runbook

## Severity Levels

### SEV-1 (Critical)
- Complete service outage
- Data loss/corruption
- Security breach
- Response: Immediate (24/7)

### SEV-2 (High)
- Partial service degradation
- Major feature broken
- Performance severely impacted
- Response: Within 1 hour

### SEV-3 (Medium)
- Minor feature broken
- Workaround available
- Response: Within 4 hours

### SEV-4 (Low)
- Cosmetic issues
- Documentation errors
- Response: Next business day

## Incident Response Process

1. **Detection**
   - Alert fired
   - User report
   - Monitoring detected

2. **Triage**
   - Classify severity
   - Page on-call engineer
   - Create incident channel

3. **Mitigation**
   - Stop the bleeding
   - Implement workaround
   - Rollback if needed

4. **Investigation**
   - Identify root cause
   - Gather evidence
   - Document timeline

5. **Resolution**
   - Deploy fix
   - Verify resolution
   - Monitor for recurrence

6. **Post-Mortem**
   - Write incident report
   - Identify action items
   - Update runbooks

## Common Issues

### High Error Rate

**Symptoms:**
- Error rate > 5%
- 5xx responses increasing

**Investigation:**
```bash
# Check error logs
kubectl logs -l app=api --tail=100 | grep ERROR

# Check recent deployments
kubectl rollout history deployment/api

# Check resource usage
kubectl top pods -l app=api

Mitigation:

# Rollback deployment
kubectl rollout undo deployment/api

# Scale up
kubectl scale deployment/api --replicas=10

# Check database
psql -c "SELECT * FROM pg_stat_activity WHERE state = 'active';"

High Latency

Symptoms:

Investigation:

# Check slow queries
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;

# Check cache hit rate
redis-cli info stats | grep keyspace

# Check external services
curl -w "@curl-format.txt" https://external-api.com/health

Mitigation:

Database Connection Pool Exhausted

Symptoms:

Investigation:

-- Check active connections
SELECT count(*) FROM pg_stat_activity;

-- Check by state
SELECT state, count(*) FROM pg_stat_activity GROUP BY state;

-- Check long-running queries
SELECT pid, now() - query_start as duration, query
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY duration DESC;

Mitigation:

# Kill long-running queries
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND state = 'active'
AND now() - query_start > interval '5 minutes';

Escalation Path

  1. On-call engineer
  2. Team lead
  3. Engineering manager
  4. VP Engineering
  5. CTO

Communication Templates

Status Update

**Incident:** [Brief description]
**Severity:** SEV-X
**Status:** Investigating/Mitigating/Resolved
**Impact:** [Who/what is affected]
**ETA:** [Expected resolution time]
**Next Update:** [When]

Resolution Message

**Incident Resolved**

**Root Cause:** [What went wrong]
**Fix:** [What we did]
**Prevention:** [How we'll prevent recurrence]
**Duration:** [How long it lasted]

**Incident Management Tool:**

```python
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List, Optional

class Severity(Enum):
    SEV1 = "critical"
    SEV2 = "high"
    SEV3 = "medium"
    SEV4 = "low"

class IncidentStatus(Enum):
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    MONITORING = "monitoring"
    RESOLVED = "resolved"

@dataclass
class Incident:
    """Incident tracking"""
    id: str
    title: str
    severity: Severity
    status: IncidentStatus
    created_at: datetime
    detected_by: str
    assigned_to: Optional[str] = None
    description: str = ""
    timeline: List[dict] = None
    root_cause: Optional[str] = None
    resolution: Optional[str] = None
    
    def __post_init__(self):
        if self.timeline is None:
            self.timeline = []
        
        self.add_timeline_event(
            f"Incident created by {self.detected_by}",
            IncidentStatus.INVESTIGATING
        )
    
    def add_timeline_event(self, description: str, new_status: IncidentStatus = None):
        """Agregar evento al timeline"""
        event = {
            'timestamp': datetime.utcnow(),
            'description': description
        }
        
        if new_status:
            event['status_change'] = {
                'from': self.status.value,
                'to': new_status.value
            }
            self.status = new_status
        
        self.timeline.append(event)
    
    def assign(self, engineer: str):
        """Asignar a ingeniero"""
        self.assigned_to = engineer
        self.add_timeline_event(f"Assigned to {engineer}")
    
    def update_status(self, new_status: IncidentStatus, note: str):
        """Actualizar status"""
        self.add_timeline_event(note, new_status)
    
    def resolve(self, root_cause: str, resolution: str):
        """Resolver incident"""
        self.root_cause = root_cause
        self.resolution = resolution
        self.update_status(
            IncidentStatus.RESOLVED,
            f"Incident resolved. Root cause: {root_cause}"
        )
    
    def generate_report(self) -> str:
        """Generar reporte de incident"""
        duration = datetime.utcnow() - self.created_at
        
        report = f"""
# Incident Report: {self.title}

**Incident ID:** {self.id}
**Severity:** {self.severity.value}
**Status:** {self.status.value}
**Duration:** {duration}
**Detected By:** {self.detected_by}
**Assigned To:** {self.assigned_to}

## Description
{self.description}

## Timeline
"""
        for event in self.timeline:
            report += f"\n- **{event['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}**: {event['description']}"
        
        if self.root_cause:
            report += f"\n\n## Root Cause\n{self.root_cause}"
        
        if self.resolution:
            report += f"\n\n## Resolution\n{self.resolution}"
        
        return report

# Uso
incident = Incident(
    id="INC-2024-001",
    title="High error rate in API",
    severity=Severity.SEV1,
    status=IncidentStatus.INVESTIGATING,
    created_at=datetime.utcnow(),
    detected_by="monitoring-system",
    description="Error rate spiked to 15% at 14:23 UTC"
)

incident.assign("alice@example.com")
incident.add_timeline_event("Identified database connection pool exhaustion")
incident.update_status(IncidentStatus.IDENTIFIED, "Root cause identified")
incident.add_timeline_event("Increased pool size from 20 to 50")
incident.update_status(IncidentStatus.MONITORING, "Monitoring for stability")

incident.resolve(
    root_cause="Database connection pool too small for traffic spike",
    resolution="Increased pool size and added auto-scaling"
)

print(incident.generate_report())

¡Felicidades! 🎉

Has completado la FASE 4: Seguridad y Observabilidad del roadmap del desarrollador del futuro.

Lo que has aprendido:

✅ OWASP Top 10 - Vulnerabilidades críticas
✅ Injection attacks - SQL, NoSQL, Command
✅ XSS, CSRF y protecciones
✅ Autenticación y autorización seguras
✅ MFA y RBAC
✅ Encriptación y gestión de secretos
✅ Static analysis - SonarQube, CodeQL
✅ Dependency scanning
✅ Logging estructurado
✅ Métricas con Prometheus
✅ Distributed tracing con OpenTelemetry
✅ Grafana dashboards
✅ SLIs, SLOs, SLAs y Error Budgets
✅ Incident response y runbooks

Próximos pasos:

FASE 5: Ingeniería de Datos

FASE 6: IA y Colaboración con IA


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo