🔒 Developer Cookbook - FASE 4: Ciberseguridad para Desarrolladores

Recetas prácticas para construir aplicaciones seguras, monitoreadas y observables


📚 Tabla de Contenidos

  1. Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas
  2. Receta 4.2: Injection Attacks - SQL, NoSQL, Command
  3. Receta 4.3: XSS (Cross-Site Scripting)
  4. Receta 4.4: CSRF (Cross-Site Request Forgery)
  5. Receta 4.5: Authentication & Authorization
  6. Receta 4.6: Encriptación y Gestión de Secretos

Ciberseguridad para Desarrolladores

Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas

¿Qué es OWASP Top 10? Lista de las 10 vulnerabilidades de seguridad más críticas en aplicaciones web, actualizada cada 3-4 años.

OWASP Top 10 (2021):

#VulnerabilidadImpactoPrevalencia
1Broken Access Control🔴 Alto🔴 Muy común
2Cryptographic Failures🔴 Alto🟡 Común
3Injection🔴 Alto🟡 Común
4Insecure Design🟡 Medio🟢 Moderado
5Security Misconfiguration🟡 Medio🔴 Muy común
6Vulnerable Components🟡 Medio🔴 Muy común
7Identification/Auth Failures🔴 Alto🟡 Común
8Software/Data Integrity🔴 Alto🟢 Moderado
9Security Logging Failures🟡 Medio🟡 Común
10Server-Side Request Forgery🟡 Medio🟢 Moderado

Receta 4.2: Injection Attacks - SQL, NoSQL, Command

A3: Injection

SQL Injection

❌ VULNERABLE:

# Flask/Python - NUNCA hacer esto
@app.route('/user/<user_id>')
def get_user(user_id):
    # SQL Injection vulnerable
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)

    # Ataque: /user/1 OR 1=1
    # Query: SELECT * FROM users WHERE id = 1 OR 1=1
    # Retorna TODOS los usuarios

✅ SEGURO - Prepared Statements:

# ✅ BIEN: Usar parameterized queries
@app.route('/user/<user_id>')
def get_user(user_id):
    query = "SELECT * FROM users WHERE id = %s"
    cursor.execute(query, (user_id,))

    # O con ORM (SQLAlchemy)
    user = db.session.query(User).filter(User.id == user_id).first()
    return jsonify(user.to_dict())

# ✅ BIEN: Con validación adicional
@app.route('/user/<int:user_id>')  # Type hint valida que sea int
def get_user(user_id):
    if not isinstance(user_id, int) or user_id < 1:
        return jsonify({'error': 'Invalid user ID'}), 400

    query = "SELECT * FROM users WHERE id = %s"
    cursor.execute(query, (user_id,))

Casos comunes de SQL Injection:

# 1. Login bypass
# ❌ VULNERABLE
username = request.form['username']
password = request.form['password']
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"

# Ataque: username = admin'--
# Query: SELECT * FROM users WHERE username='admin'-- AND password='...'
# Comenta el resto, bypass de password!

# ✅ SEGURO
query = "SELECT * FROM users WHERE username=%s AND password=%s"
cursor.execute(query, (username, hashed_password))

# 2. UNION-based injection
# ❌ VULNERABLE
search = request.args.get('q')
query = f"SELECT title, content FROM articles WHERE title LIKE '%{search}%'"

# Ataque: q=' UNION SELECT username, password FROM users--
# Extrae credenciales!

# ✅ SEGURO
query = "SELECT title, content FROM articles WHERE title LIKE %s"
cursor.execute(query, (f'%{search}%',))

# 3. Blind SQL Injection
# ❌ VULNERABLE
product_id = request.args.get('id')
query = f"SELECT * FROM products WHERE id={product_id}"

# Ataque: id=1 AND (SELECT SLEEP(5))
# Si tarda 5 segundos, vulnerable!

# ✅ SEGURO: Prepared statements + validación
if not product_id.isdigit():
    return jsonify({'error': 'Invalid ID'}), 400

query = "SELECT * FROM products WHERE id=%s"
cursor.execute(query, (product_id,))

NoSQL Injection (MongoDB)

# ❌ VULNERABLE
@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')

    # NoSQL Injection vulnerable
    user = db.users.find_one({
        'username': username,
        'password': password
    })

    # Ataque JSON:
    # {"username": {"$ne": null}, "password": {"$ne": null}}
    # Retorna el primer usuario (admin bypass)

# ✅ SEGURO
@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')

    # Validar tipos
    if not isinstance(username, str) or not isinstance(password, str):
        return jsonify({'error': 'Invalid credentials'}), 400

    # Sanitizar input (eliminar operadores)
    username = username.replace('$', '')

    # Usar operadores explícitos
    user = db.users.find_one({
        'username': {'$eq': username}
    })

    if user and check_password_hash(user['password'], password):
        return jsonify({'token': generate_token(user['_id'])})

    return jsonify({'error': 'Invalid credentials'}), 401

Command Injection

# ❌ VULNERABLE
import subprocess

@app.route('/ping')
def ping():
    host = request.args.get('host')

    # Command injection vulnerable
    result = subprocess.run(f'ping -c 4 {host}', shell=True, capture_output=True)

    # Ataque: host=google.com; cat /etc/passwd
    # Ejecuta: ping -c 4 google.com; cat /etc/passwd

# ✅ SEGURO
import subprocess
import re

@app.route('/ping')
def ping():
    host = request.args.get('host')

    # 1. Validar formato (solo dominios/IPs válidos)
    if not re.match(r'^[a-zA-Z0-9.-]+$', host):
        return jsonify({'error': 'Invalid host'}), 400

    # 2. NO usar shell=True
    # 3. Pasar comando como lista
    try:
        result = subprocess.run(
            ['ping', '-c', '4', host],
            capture_output=True,
            text=True,
            timeout=10
        )
        return jsonify({'output': result.stdout})
    except subprocess.TimeoutExpired:
        return jsonify({'error': 'Timeout'}), 408

Prevención general:

class InputValidator:
    """Validador de inputs"""

    @staticmethod
    def sanitize_string(input_str: str, max_length: int = 255) -> str:
        """Sanitizar string general"""
        if not isinstance(input_str, str):
            raise ValueError("Input must be string")

        # Truncar
        sanitized = input_str[:max_length]

        # Eliminar caracteres peligrosos
        dangerous_chars = ['<', '>', '&', '"', "'", ';', '|', '`', '$']
        for char in dangerous_chars:
            sanitized = sanitized.replace(char, '')

        return sanitized.strip()

    @staticmethod
    def validate_email(email: str) -> bool:
        """Validar email"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))

    @staticmethod
    def validate_username(username: str) -> bool:
        """Validar username (alfanumérico + guiones)"""
        import re
        return bool(re.match(r'^[a-zA-Z0-9_-]{3,20}$', username))

    @staticmethod
    def validate_sql_safe(value: str) -> bool:
        """Verificar que no contenga SQL keywords"""
        sql_keywords = [
            'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
            'ALTER', 'UNION', 'OR', 'AND', '--', '/*', '*/', 'xp_'
        ]
        upper_value = value.upper()
        return not any(keyword in upper_value for keyword in sql_keywords)

# Uso
validator = InputValidator()

@app.route('/register', methods=['POST'])
def register():
    try:
        username = validator.sanitize_string(request.json.get('username'))
        email = request.json.get('email')

        if not validator.validate_username(username):
            return jsonify({'error': 'Invalid username'}), 400

        if not validator.validate_email(email):
            return jsonify({'error': 'Invalid email'}), 400

        # Continuar con registro...
    except ValueError as e:
        return jsonify({'error': str(e)}), 400

Receta 4.3: XSS (Cross-Site Scripting)

¿Qué es XSS? Inyección de código JavaScript malicioso en páginas web vistas por otros usuarios.

Tipos de XSS:

1. Reflected XSS (No persistente)

# ❌ VULNERABLE
@app.route('/search')
def search():
    query = request.args.get('q', '')

    # XSS vulnerable - renderiza input sin escape
    return f"""
    <html>
        <body>
            <h1>Resultados para: {query}</h1>
        </body>
    </html>
    """

    # Ataque: /search?q=<script>alert('XSS')</script>
    # Se ejecuta el JavaScript!

# ✅ SEGURO
from markupsafe import escape

@app.route('/search')
def search():
    query = request.args.get('q', '')

    # Escapar HTML entities
    safe_query = escape(query)

    return f"""
    <html>
        <body>
            <h1>Resultados para: {safe_query}</h1>
        </body>
    </html>
    """

    # O mejor: usar template engine con auto-escape
    return render_template('search.html', query=query)
<!-- templates/search.html (Jinja2 auto-escapa por default) -->
<!DOCTYPE html>
<html>
<body>
    <h1>Resultados para: {{ query }}</h1>

    <!-- Si NECESITAS HTML sin escape (PELIGROSO) -->
    <!-- <div>{{ content | safe }}</div> -->
</body>
</html>

2. Stored XSS (Persistente)

# ❌ VULNERABLE
@app.route('/comment', methods=['POST'])
def add_comment():
    comment = request.json.get('comment')

    # Guardar sin sanitizar
    db.comments.insert_one({
        'text': comment,
        'created_at': datetime.now()
    })

    return jsonify({'status': 'success'})

@app.route('/comments')
def get_comments():
    comments = list(db.comments.find())

    # Renderizar sin escape
    html = '<div>'
    for comment in comments:
        html += f"<p>{comment['text']}</p>"
    html += '</div>'

    return html

    # Ataque: comment = "<img src=x onerror='alert(document.cookie)'>"
    # Se ejecuta en TODOS los usuarios que vean los comentarios!

# ✅ SEGURO
import bleach

@app.route('/comment', methods=['POST'])
def add_comment():
    comment = request.json.get('comment')

    # Sanitizar HTML permitiendo solo tags seguros
    clean_comment = bleach.clean(
        comment,
        tags=['b', 'i', 'u', 'p', 'br'],  # Tags permitidos
        attributes={},  # Sin atributos
        strip=True
    )

    db.comments.insert_one({
        'text': clean_comment,
        'created_at': datetime.now()
    })

    return jsonify({'status': 'success'})

@app.route('/comments')
def get_comments():
    comments = list(db.comments.find())

    # Renderizar con auto-escape
    return render_template('comments.html', comments=comments)

3. DOM-based XSS (Client-side)

<!-- ❌ VULNERABLE -->
<script>
function displayName() {
    // Lee de URL
    const name = new URLSearchParams(window.location.search).get('name');

    // Inserta en DOM sin sanitizar
    document.getElementById('greeting').innerHTML = 'Hello, ' + name;

    // Ataque: ?name=<img src=x onerror='alert(1)'>
}
</script>

<!-- ✅ SEGURO -->
<script>
function displayName() {
    const name = new URLSearchParams(window.location.search).get('name');

    // Usar textContent en vez de innerHTML
    document.getElementById('greeting').textContent = 'Hello, ' + name;

    // O sanitizar con DOMPurify
    const cleanName = DOMPurify.sanitize(name);
    document.getElementById('greeting').innerHTML = 'Hello, ' + cleanName;
}
</script>

Content Security Policy (CSP):

from flask import Flask, make_response

app = Flask(__name__)

@app.after_request
def set_csp(response):
    """Configurar Content Security Policy"""
    csp = {
        "default-src": "'self'",
        "script-src": "'self' 'unsafe-inline' https://cdn.example.com",
        "style-src": "'self' 'unsafe-inline'",
        "img-src": "'self' data: https:",
        "font-src": "'self' https://fonts.gstatic.com",
        "connect-src": "'self' https://api.example.com",
        "frame-ancestors": "'none'",
        "base-uri": "'self'",
        "form-action": "'self'"
    }

    csp_string = '; '.join([f"{k} {v}" for k, v in csp.items()])
    response.headers['Content-Security-Policy'] = csp_string

    # Otros headers de seguridad
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'

    return response

Receta 4.4: CSRF (Cross-Site Request Forgery)

¿Qué es CSRF? Atacante engaña a usuario autenticado para ejecutar acciones no deseadas.

Escenario del ataque:

<!-- Sitio malicioso: evil.com -->
<html>
<body>
    <h1>¡Ganaste un premio!</h1>

    <!-- Formulario oculto que ejecuta acción en bank.com -->
    <form id="evil" action="https://bank.com/transfer" method="POST">
        <input type="hidden" name="to" value="attacker">
        <input type="hidden" name="amount" value="10000">
    </form>

    <script>
        // Auto-submit cuando usuario visita la página
        document.getElementById('evil').submit();
    </script>
</body>
</html>

<!-- Si usuario está autenticado en bank.com, la transferencia se ejecuta! -->

❌ VULNERABLE:

@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
    # Sin protección CSRF
    to = request.form.get('to')
    amount = request.form.get('amount')

    # Ejecuta la transferencia
    execute_transfer(current_user, to, amount)

    return jsonify({'status': 'success'})

✅ SEGURO - CSRF Tokens:

from flask_wtf.csrf import CSRFProtect
import secrets

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
csrf = CSRFProtect(app)

@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
    # Flask-WTF verifica CSRF token automáticamente
    to = request.form.get('to')
    amount = request.form.get('amount')

    execute_transfer(current_user, to, amount)

    return jsonify({'status': 'success'})
<!-- Template con CSRF token -->
<form method="POST" action="/transfer">
    <!-- Token CSRF -->
    <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">

    <input type="text" name="to" placeholder="Destinatario">
    <input type="number" name="amount" placeholder="Monto">
    <button type="submit">Transferir</button>
</form>

Para APIs (SPA con AJAX):

# Backend: Generar token y enviarlo en cookie
from flask import jsonify, make_response

@app.route('/api/csrf-token')
def get_csrf_token():
    token = secrets.token_urlsafe(32)

    # Guardar en sesión
    session['csrf_token'] = token

    response = make_response(jsonify({'csrf_token': token}))

    # Cookie con SameSite
    response.set_cookie(
        'csrf_token',
        token,
        httponly=False,  # JS necesita leerla
        secure=True,
        samesite='Strict'
    )

    return response

@app.route('/api/transfer', methods=['POST'])
@login_required
def api_transfer():
    # Validar token del header
    token_from_header = request.headers.get('X-CSRF-Token')
    token_from_session = session.get('csrf_token')

    if not token_from_header or token_from_header != token_from_session:
        return jsonify({'error': 'CSRF token mismatch'}), 403

    # Procesar transferencia
    data = request.json
    execute_transfer(current_user, data['to'], data['amount'])

    return jsonify({'status': 'success'})
// Frontend: Incluir token en requests
async function transfer(to, amount) {
    // Obtener token
    const tokenResponse = await fetch('/api/csrf-token');
    const { csrf_token } = await tokenResponse.json();

    // Incluir en header
    const response = await fetch('/api/transfer', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'X-CSRF-Token': csrf_token
        },
        body: JSON.stringify({ to, amount })
    });

    return response.json();
}

SameSite Cookie Attribute:

# Protección adicional contra CSRF
@app.after_request
def set_cookie_samesite(response):
    """Configurar SameSite en cookies"""
    for cookie in response.headers.getlist('Set-Cookie'):
        if 'SameSite' not in cookie:
            response.headers.add('Set-Cookie', cookie + '; SameSite=Lax')

    return response

# Opciones SameSite:
# - Strict: Cookie nunca se envía en requests cross-site (más seguro)
# - Lax: Cookie se envía en navegación top-level (balance)
# - None: Cookie se envía siempre (requiere Secure flag)

response.set_cookie(
    'session',
    session_id,
    httponly=True,
    secure=True,
    samesite='Strict'
)

Receta 4.5: Authentication & Authorization

A7: Identification and Authentication Failures

Autenticación Segura

from werkzeug.security import generate_password_hash, check_password_hash
import secrets
from datetime import datetime, timedelta
import jwt

class AuthManager:
    """Gestión de autenticación segura"""

    SECRET_KEY = 'your-secret-key'  # En producción: variable de entorno

    @staticmethod
    def hash_password(password: str) -> str:
        """Hash password con salt"""
        # Usar bcrypt o argon2 en producción
        return generate_password_hash(password, method='pbkdf2:sha256', salt_length=16)

    @staticmethod
    def verify_password(password: str, password_hash: str) -> bool:
        """Verificar password"""
        return check_password_hash(password_hash, password)

    @staticmethod
    def validate_password_strength(password: str) -> tuple[bool, str]:
        """Validar fortaleza del password"""
        if len(password) < 12:
            return False, "Password must be at least 12 characters"

        if not any(c.isupper() for c in password):
            return False, "Password must contain uppercase letter"

        if not any(c.islower() for c in password):
            return False, "Password must contain lowercase letter"

        if not any(c.isdigit() for c in password):
            return False, "Password must contain digit"

        if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in password):
            return False, "Password must contain special character"

        # Verificar contra contraseñas comunes
        common_passwords = ['password123', 'qwerty123', '123456789']
        if password.lower() in common_passwords:
            return False, "Password is too common"

        return True, "Password is strong"

    @staticmethod
    def generate_jwt(user_id: str, expires_in: int = 3600) -> str:
        """Generar JWT token"""
        payload = {
            'user_id': user_id,
            'exp': datetime.utcnow() + timedelta(seconds=expires_in),
            'iat': datetime.utcnow()
        }

        return jwt.encode(payload, AuthManager.SECRET_KEY, algorithm='HS256')

    @staticmethod
    def verify_jwt(token: str) -> dict:
        """Verificar JWT token"""
        try:
            payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=['HS256'])
            return {'valid': True, 'user_id': payload['user_id']}
        except jwt.ExpiredSignatureError:
            return {'valid': False, 'error': 'Token expired'}
        except jwt.InvalidTokenError:
            return {'valid': False, 'error': 'Invalid token'}

# Uso
@app.route('/register', methods=['POST'])
def register():
    data = request.json
    password = data.get('password')

    # Validar fortaleza
    is_valid, message = AuthManager.validate_password_strength(password)
    if not is_valid:
        return jsonify({'error': message}), 400

    # Hash password
    password_hash = AuthManager.hash_password(password)

    # Crear usuario
    user = db.users.insert_one({
        'email': data['email'],
        'password': password_hash,
        'created_at': datetime.utcnow()
    })

    return jsonify({'status': 'success', 'user_id': str(user.inserted_id)})

@app.route('/login', methods=['POST'])
def login():
    data = request.json

    # Rate limiting (prevenir brute force)
    if is_rate_limited(request.remote_addr):
        return jsonify({'error': 'Too many attempts'}), 429

    # Buscar usuario
    user = db.users.find_one({'email': data['email']})

    if not user or not AuthManager.verify_password(data['password'], user['password']):
        # Registrar intento fallido
        log_failed_login(request.remote_addr, data['email'])

        # Mensaje genérico (no revelar si usuario existe)
        return jsonify({'error': 'Invalid credentials'}), 401

    # Generar token
    token = AuthManager.generate_jwt(str(user['_id']))

    return jsonify({'token': token})

Multi-Factor Authentication (MFA)

import pyotp
import qrcode
from io import BytesIO
import base64

class MFAManager:
    """Gestión de MFA (TOTP)"""

    @staticmethod
    def generate_secret() -> str:
        """Generar secret para TOTP"""
        return pyotp.random_base32()

    @staticmethod
    def generate_qr_code(secret: str, user_email: str, issuer: str = 'MyApp') -> str:
        """Generar QR code para Google Authenticator"""
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user_email,
            issuer_name=issuer
        )

        # Generar QR
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(provisioning_uri)
        qr.make(fit=True)

        img = qr.make_image(fill_color="black", back_color="white")

        # Convertir a base64
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()

        return f"data:image/png;base64,{img_str}"

    @staticmethod
    def verify_totp(secret: str, code: str) -> bool:
        """Verificar código TOTP"""
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=1)  # Acepta 1 código antes/después

# Endpoints
@app.route('/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
    """Configurar MFA para usuario"""
    user_id = current_user['_id']

    # Generar secret
    secret = MFAManager.generate_secret()

    # Guardar secret (encriptado en producción)
    db.users.update_one(
        {'_id': user_id},
        {'$set': {'mfa_secret': secret, 'mfa_enabled': False}}
    )

    # Generar QR
    qr_code = MFAManager.generate_qr_code(secret, current_user['email'])

    return jsonify({
        'qr_code': qr_code,
        'secret': secret  # Para manual entry
    })

@app.route('/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
    """Verificar y activar MFA"""
    code = request.json.get('code')
    user = db.users.find_one({'_id': current_user['_id']})

    if MFAManager.verify_totp(user['mfa_secret'], code):
        # Activar MFA
        db.users.update_one(
            {'_id': current_user['_id']},
            {'$set': {'mfa_enabled': True}}
        )

        # Generar backup codes
        backup_codes = [secrets.token_hex(8) for _ in range(10)]
        db.users.update_one(
            {'_id': current_user['_id']},
            {'$set': {'backup_codes': [hash_code(c) for c in backup_codes]}}
        )

        return jsonify({
            'status': 'success',
            'backup_codes': backup_codes  # Mostrar UNA VEZ
        })

    return jsonify({'error': 'Invalid code'}), 400

@app.route('/login/mfa', methods=['POST'])
def login_with_mfa():
    """Login con MFA"""
    data = request.json
    temp_token = data.get('temp_token')
    mfa_code = data.get('mfa_code')

    # Verificar temp token
    payload = verify_temp_token(temp_token)
    if not payload:
        return jsonify({'error': 'Invalid token'}), 401

    user = db.users.find_one({'_id': payload['user_id']})

    # Verificar código MFA
    if MFAManager.verify_totp(user['mfa_secret'], mfa_code):
        token = AuthManager.generate_jwt(str(user['_id']))
        return jsonify({'token': token})

    # O verificar backup code
    if verify_backup_code(user, mfa_code):
        token = AuthManager.generate_jwt(str(user['_id']))
        return jsonify({'token': token})

    return jsonify({'error': 'Invalid MFA code'}), 401

Role-Based Access Control (RBAC)

from functools import wraps
from flask import g

class Role:
    ADMIN = 'admin'
    MODERATOR = 'moderator'
    USER = 'user'
    GUEST = 'guest'

class Permission:
    READ = 'read'
    WRITE = 'write'
    DELETE = 'delete'
    ADMIN = 'admin'

# Matriz de permisos por rol
ROLE_PERMISSIONS = {
    Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
    Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
    Role.USER: [Permission.READ, Permission.WRITE],
    Role.GUEST: [Permission.READ]
}

def require_permission(permission: str):
    """Decorator para requerir permiso"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not g.user:
                return jsonify({'error': 'Not authenticated'}), 401

            user_role = g.user.get('role', Role.GUEST)
            user_permissions = ROLE_PERMISSIONS.get(user_role, [])

            if permission not in user_permissions:
                return jsonify({'error': 'Insufficient permissions'}), 403

            return f(*args, **kwargs)
        return decorated_function
    return decorator

def require_role(role: str):
    """Decorator para requerir rol específico"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not g.user:
                return jsonify({'error': 'Not authenticated'}), 401

            if g.user.get('role') != role:
                return jsonify({'error': 'Insufficient permissions'}), 403

            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Uso
@app.route('/admin/users')
@require_role(Role.ADMIN)
def list_users():
    """Solo admins pueden listar usuarios"""
    users = list(db.users.find({}, {'password': 0}))
    return jsonify(users)

@app.route('/posts/<post_id>', methods=['DELETE'])
@require_permission(Permission.DELETE)
def delete_post(post_id):
    """Requiere permiso de DELETE (admins y moderadores)"""
    post = db.posts.find_one({'_id': post_id})

    # Verificar ownership (usuarios pueden borrar sus propios posts)
    if g.user['role'] == Role.USER and post['author_id'] != g.user['_id']:
        return jsonify({'error': 'Can only delete own posts'}), 403

    db.posts.delete_one({'_id': post_id})
    return jsonify({'status': 'success'})

Receta 4.6: Encriptación y Gestión de Secretos

Encriptación en tránsito vs en reposo:

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os

class EncryptionManager:
    """Gestión de encriptación"""

    @staticmethod
    def generate_key() -> bytes:
        """Generar key para Fernet"""
        return Fernet.generate_key()

    @staticmethod
    def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
        """Derivar key desde password usando PBKDF2"""
        if salt is None:
            salt = os.urandom(16)

        kdf = PBKDF2(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000
        )
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return key, salt

    @staticmethod
    def encrypt(data: str, key: bytes) -> str:
        """Encriptar data"""
        f = Fernet(key)
        encrypted = f.encrypt(data.encode())
        return encrypted.decode()

    @staticmethod
    def decrypt(encrypted_data: str, key: bytes) -> str:
        """Desencriptar data"""
        f = Fernet(key)
        decrypted = f.decrypt(encrypted_data.encode())
        return decrypted.decode()

# Ejemplo: Encriptar datos sensibles en DB
class UserSecrets:
    """Gestión de datos sensibles de usuario"""

    def __init__(self, encryption_key: bytes):
        self.encryption_manager = EncryptionManager()
        self.key = encryption_key

    def store_credit_card(self, user_id: str, card_number: str, cvv: str):
        """Guardar tarjeta encriptada"""
        encrypted_card = self.encryption_manager.encrypt(card_number, self.key)
        encrypted_cvv = self.encryption_manager.encrypt(cvv, self.key)

        # Guardar (solo últimos 4 dígitos en plain text para UI)
        db.user_payment_methods.insert_one({
            'user_id': user_id,
            'card_number_encrypted': encrypted_card,
            'cvv_encrypted': encrypted_cvv,
            'last_four': card_number[-4:],
            'created_at': datetime.utcnow()
        })

    def get_credit_card(self, user_id: str) -> dict:
        """Obtener tarjeta desencriptada"""
        card = db.user_payment_methods.find_one({'user_id': user_id})

        if not card:
            return None

        card_number = self.encryption_manager.decrypt(
            card['card_number_encrypted'],
            self.key
        )

        return {
            'card_number': card_number,
            'last_four': card['last_four']
        }

Gestión de Secretos:

# ❌ NUNCA hacer esto
API_KEY = "sk_live_abc123"  # Hardcoded en código
DATABASE_PASSWORD = "mypassword123"

# ✅ BIEN: Variables de entorno
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv('API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')

# ✅ MEJOR: Gestores de secretos (AWS Secrets Manager, HashiCorp Vault)
import boto3
import json

class SecretsManager:
    """Wrapper para AWS Secrets Manager"""

    def __init__(self, region_name='us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region_name)
        self._cache = {}

    def get_secret(self, secret_name: str) -> dict:
        """Obtener secreto (con cache)"""
        if secret_name in self._cache:
            return self._cache[secret_name]

        try:
            response = self.client.get_secret_value(SecretId=secret_name)
            secret = json.loads(response['SecretString'])
            self._cache[secret_name] = secret
            return secret
        except Exception as e:
            print(f"Error retrieving secret: {e}")
            raise

    def rotate_secret(self, secret_name: str, new_value: dict):
        """Rotar secreto"""
        self.client.update_secret(
            SecretId=secret_name,
            SecretString=json.dumps(new_value)
        )

        # Invalidar cache
        self._cache.pop(secret_name, None)

# Uso
secrets = SecretsManager()

db_creds = secrets.get_secret('myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/{db_creds['database']}"

api_keys = secrets.get_secret('myapp/api-keys')
STRIPE_KEY = api_keys['stripe']
SENDGRID_KEY = api_keys['sendgrid']

Terraform - Gestión de secretos:

# secrets.tf

# Generar password random
resource "random_password" "db_password" {
  length  = 32
  special = true
}

# Guardar en AWS Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
  name = "myapp/database"

  recovery_window_in_days = 7
}

resource "aws_secretsmanager_secret_version" "db_credentials" {
  secret_id = aws_secretsmanager_secret.db_credentials.id

  secret_string = jsonencode({
    username = "postgres"
    password = random_password.db_password.result
    host     = aws_db_instance.main.endpoint
    database = "myapp"
  })
}

# IAM policy para acceso
resource "aws_iam_role_policy" "secrets_access" {
  role = aws_iam_role.ecs_task.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Resource = [
          aws_secretsmanager_secret.db_credentials.arn
        ]
      }
    ]
  })
}

# ECS Task con secretos
resource "aws_ecs_task_definition" "api" {
  family = "myapp-api"

  container_definitions = jsonencode([
    {
      name  = "api"
      image = "myapp:latest"

      secrets = [
        {
          name      = "DATABASE_URL"
          valueFrom = aws_secretsmanager_secret.db_credentials.arn
        }
      ]
    }
  ])
}

Versión: 1.0 Fecha: 2024 Autor: Roadmap del Desarrollador del Futuro Licencia: Uso educativo