🔒 Developer Cookbook - FASE 4: Ciberseguridad para Desarrolladores
Recetas prácticas para construir aplicaciones seguras, monitoreadas y observables
📚 Tabla de Contenidos
- Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas
- Receta 4.2: Injection Attacks - SQL, NoSQL, Command
- Receta 4.3: XSS (Cross-Site Scripting)
- Receta 4.4: CSRF (Cross-Site Request Forgery)
- Receta 4.5: Authentication & Authorization
- Receta 4.6: Encriptación y Gestión de Secretos
Ciberseguridad para Desarrolladores
Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas
¿Qué es OWASP Top 10? Lista de las 10 vulnerabilidades de seguridad más críticas en aplicaciones web, actualizada cada 3-4 años.
OWASP Top 10 (2021):
| # | Vulnerabilidad | Impacto | Prevalencia |
|---|---|---|---|
| 1 | Broken Access Control | 🔴 Alto | 🔴 Muy común |
| 2 | Cryptographic Failures | 🔴 Alto | 🟡 Común |
| 3 | Injection | 🔴 Alto | 🟡 Común |
| 4 | Insecure Design | 🟡 Medio | 🟢 Moderado |
| 5 | Security Misconfiguration | 🟡 Medio | 🔴 Muy común |
| 6 | Vulnerable Components | 🟡 Medio | 🔴 Muy común |
| 7 | Identification/Auth Failures | 🔴 Alto | 🟡 Común |
| 8 | Software/Data Integrity | 🔴 Alto | 🟢 Moderado |
| 9 | Security Logging Failures | 🟡 Medio | 🟡 Común |
| 10 | Server-Side Request Forgery | 🟡 Medio | 🟢 Moderado |
Receta 4.2: Injection Attacks - SQL, NoSQL, Command
A3: Injection
SQL Injection
❌ VULNERABLE:
# Flask/Python - NUNCA hacer esto
@app.route('/user/<user_id>')
def get_user(user_id):
# SQL Injection vulnerable
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
# Ataque: /user/1 OR 1=1
# Query: SELECT * FROM users WHERE id = 1 OR 1=1
# Retorna TODOS los usuarios
✅ SEGURO - Prepared Statements:
# ✅ BIEN: Usar parameterized queries
@app.route('/user/<user_id>')
def get_user(user_id):
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
# O con ORM (SQLAlchemy)
user = db.session.query(User).filter(User.id == user_id).first()
return jsonify(user.to_dict())
# ✅ BIEN: Con validación adicional
@app.route('/user/<int:user_id>') # Type hint valida que sea int
def get_user(user_id):
if not isinstance(user_id, int) or user_id < 1:
return jsonify({'error': 'Invalid user ID'}), 400
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
Casos comunes de SQL Injection:
# 1. Login bypass
# ❌ VULNERABLE
username = request.form['username']
password = request.form['password']
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
# Ataque: username = admin'--
# Query: SELECT * FROM users WHERE username='admin'-- AND password='...'
# Comenta el resto, bypass de password!
# ✅ SEGURO
query = "SELECT * FROM users WHERE username=%s AND password=%s"
cursor.execute(query, (username, hashed_password))
# 2. UNION-based injection
# ❌ VULNERABLE
search = request.args.get('q')
query = f"SELECT title, content FROM articles WHERE title LIKE '%{search}%'"
# Ataque: q=' UNION SELECT username, password FROM users--
# Extrae credenciales!
# ✅ SEGURO
query = "SELECT title, content FROM articles WHERE title LIKE %s"
cursor.execute(query, (f'%{search}%',))
# 3. Blind SQL Injection
# ❌ VULNERABLE
product_id = request.args.get('id')
query = f"SELECT * FROM products WHERE id={product_id}"
# Ataque: id=1 AND (SELECT SLEEP(5))
# Si tarda 5 segundos, vulnerable!
# ✅ SEGURO: Prepared statements + validación
if not product_id.isdigit():
return jsonify({'error': 'Invalid ID'}), 400
query = "SELECT * FROM products WHERE id=%s"
cursor.execute(query, (product_id,))
NoSQL Injection (MongoDB)
# ❌ VULNERABLE
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
# NoSQL Injection vulnerable
user = db.users.find_one({
'username': username,
'password': password
})
# Ataque JSON:
# {"username": {"$ne": null}, "password": {"$ne": null}}
# Retorna el primer usuario (admin bypass)
# ✅ SEGURO
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
# Validar tipos
if not isinstance(username, str) or not isinstance(password, str):
return jsonify({'error': 'Invalid credentials'}), 400
# Sanitizar input (eliminar operadores)
username = username.replace('$', '')
# Usar operadores explícitos
user = db.users.find_one({
'username': {'$eq': username}
})
if user and check_password_hash(user['password'], password):
return jsonify({'token': generate_token(user['_id'])})
return jsonify({'error': 'Invalid credentials'}), 401
Command Injection
# ❌ VULNERABLE
import subprocess
@app.route('/ping')
def ping():
host = request.args.get('host')
# Command injection vulnerable
result = subprocess.run(f'ping -c 4 {host}', shell=True, capture_output=True)
# Ataque: host=google.com; cat /etc/passwd
# Ejecuta: ping -c 4 google.com; cat /etc/passwd
# ✅ SEGURO
import subprocess
import re
@app.route('/ping')
def ping():
host = request.args.get('host')
# 1. Validar formato (solo dominios/IPs válidos)
if not re.match(r'^[a-zA-Z0-9.-]+$', host):
return jsonify({'error': 'Invalid host'}), 400
# 2. NO usar shell=True
# 3. Pasar comando como lista
try:
result = subprocess.run(
['ping', '-c', '4', host],
capture_output=True,
text=True,
timeout=10
)
return jsonify({'output': result.stdout})
except subprocess.TimeoutExpired:
return jsonify({'error': 'Timeout'}), 408
Prevención general:
class InputValidator:
"""Validador de inputs"""
@staticmethod
def sanitize_string(input_str: str, max_length: int = 255) -> str:
"""Sanitizar string general"""
if not isinstance(input_str, str):
raise ValueError("Input must be string")
# Truncar
sanitized = input_str[:max_length]
# Eliminar caracteres peligrosos
dangerous_chars = ['<', '>', '&', '"', "'", ';', '|', '`', '$']
for char in dangerous_chars:
sanitized = sanitized.replace(char, '')
return sanitized.strip()
@staticmethod
def validate_email(email: str) -> bool:
"""Validar email"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def validate_username(username: str) -> bool:
"""Validar username (alfanumérico + guiones)"""
import re
return bool(re.match(r'^[a-zA-Z0-9_-]{3,20}$', username))
@staticmethod
def validate_sql_safe(value: str) -> bool:
"""Verificar que no contenga SQL keywords"""
sql_keywords = [
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
'ALTER', 'UNION', 'OR', 'AND', '--', '/*', '*/', 'xp_'
]
upper_value = value.upper()
return not any(keyword in upper_value for keyword in sql_keywords)
# Uso
validator = InputValidator()
@app.route('/register', methods=['POST'])
def register():
try:
username = validator.sanitize_string(request.json.get('username'))
email = request.json.get('email')
if not validator.validate_username(username):
return jsonify({'error': 'Invalid username'}), 400
if not validator.validate_email(email):
return jsonify({'error': 'Invalid email'}), 400
# Continuar con registro...
except ValueError as e:
return jsonify({'error': str(e)}), 400
Receta 4.3: XSS (Cross-Site Scripting)
¿Qué es XSS? Inyección de código JavaScript malicioso en páginas web vistas por otros usuarios.
Tipos de XSS:
1. Reflected XSS (No persistente)
# ❌ VULNERABLE
@app.route('/search')
def search():
query = request.args.get('q', '')
# XSS vulnerable - renderiza input sin escape
return f"""
<html>
<body>
<h1>Resultados para: {query}</h1>
</body>
</html>
"""
# Ataque: /search?q=<script>alert('XSS')</script>
# Se ejecuta el JavaScript!
# ✅ SEGURO
from markupsafe import escape
@app.route('/search')
def search():
query = request.args.get('q', '')
# Escapar HTML entities
safe_query = escape(query)
return f"""
<html>
<body>
<h1>Resultados para: {safe_query}</h1>
</body>
</html>
"""
# O mejor: usar template engine con auto-escape
return render_template('search.html', query=query)
<!-- templates/search.html (Jinja2 auto-escapa por default) -->
<!DOCTYPE html>
<html>
<body>
<h1>Resultados para: {{ query }}</h1>
<!-- Si NECESITAS HTML sin escape (PELIGROSO) -->
<!-- <div>{{ content | safe }}</div> -->
</body>
</html>
2. Stored XSS (Persistente)
# ❌ VULNERABLE
@app.route('/comment', methods=['POST'])
def add_comment():
comment = request.json.get('comment')
# Guardar sin sanitizar
db.comments.insert_one({
'text': comment,
'created_at': datetime.now()
})
return jsonify({'status': 'success'})
@app.route('/comments')
def get_comments():
comments = list(db.comments.find())
# Renderizar sin escape
html = '<div>'
for comment in comments:
html += f"<p>{comment['text']}</p>"
html += '</div>'
return html
# Ataque: comment = "<img src=x onerror='alert(document.cookie)'>"
# Se ejecuta en TODOS los usuarios que vean los comentarios!
# ✅ SEGURO
import bleach
@app.route('/comment', methods=['POST'])
def add_comment():
comment = request.json.get('comment')
# Sanitizar HTML permitiendo solo tags seguros
clean_comment = bleach.clean(
comment,
tags=['b', 'i', 'u', 'p', 'br'], # Tags permitidos
attributes={}, # Sin atributos
strip=True
)
db.comments.insert_one({
'text': clean_comment,
'created_at': datetime.now()
})
return jsonify({'status': 'success'})
@app.route('/comments')
def get_comments():
comments = list(db.comments.find())
# Renderizar con auto-escape
return render_template('comments.html', comments=comments)
3. DOM-based XSS (Client-side)
<!-- ❌ VULNERABLE -->
<script>
function displayName() {
// Lee de URL
const name = new URLSearchParams(window.location.search).get('name');
// Inserta en DOM sin sanitizar
document.getElementById('greeting').innerHTML = 'Hello, ' + name;
// Ataque: ?name=<img src=x onerror='alert(1)'>
}
</script>
<!-- ✅ SEGURO -->
<script>
function displayName() {
const name = new URLSearchParams(window.location.search).get('name');
// Usar textContent en vez de innerHTML
document.getElementById('greeting').textContent = 'Hello, ' + name;
// O sanitizar con DOMPurify
const cleanName = DOMPurify.sanitize(name);
document.getElementById('greeting').innerHTML = 'Hello, ' + cleanName;
}
</script>
Content Security Policy (CSP):
from flask import Flask, make_response
app = Flask(__name__)
@app.after_request
def set_csp(response):
"""Configurar Content Security Policy"""
csp = {
"default-src": "'self'",
"script-src": "'self' 'unsafe-inline' https://cdn.example.com",
"style-src": "'self' 'unsafe-inline'",
"img-src": "'self' data: https:",
"font-src": "'self' https://fonts.gstatic.com",
"connect-src": "'self' https://api.example.com",
"frame-ancestors": "'none'",
"base-uri": "'self'",
"form-action": "'self'"
}
csp_string = '; '.join([f"{k} {v}" for k, v in csp.items()])
response.headers['Content-Security-Policy'] = csp_string
# Otros headers de seguridad
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
Receta 4.4: CSRF (Cross-Site Request Forgery)
¿Qué es CSRF? Atacante engaña a usuario autenticado para ejecutar acciones no deseadas.
Escenario del ataque:
<!-- Sitio malicioso: evil.com -->
<html>
<body>
<h1>¡Ganaste un premio!</h1>
<!-- Formulario oculto que ejecuta acción en bank.com -->
<form id="evil" action="https://bank.com/transfer" method="POST">
<input type="hidden" name="to" value="attacker">
<input type="hidden" name="amount" value="10000">
</form>
<script>
// Auto-submit cuando usuario visita la página
document.getElementById('evil').submit();
</script>
</body>
</html>
<!-- Si usuario está autenticado en bank.com, la transferencia se ejecuta! -->
❌ VULNERABLE:
@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
# Sin protección CSRF
to = request.form.get('to')
amount = request.form.get('amount')
# Ejecuta la transferencia
execute_transfer(current_user, to, amount)
return jsonify({'status': 'success'})
✅ SEGURO - CSRF Tokens:
from flask_wtf.csrf import CSRFProtect
import secrets
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
csrf = CSRFProtect(app)
@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
# Flask-WTF verifica CSRF token automáticamente
to = request.form.get('to')
amount = request.form.get('amount')
execute_transfer(current_user, to, amount)
return jsonify({'status': 'success'})
<!-- Template con CSRF token -->
<form method="POST" action="/transfer">
<!-- Token CSRF -->
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="text" name="to" placeholder="Destinatario">
<input type="number" name="amount" placeholder="Monto">
<button type="submit">Transferir</button>
</form>
Para APIs (SPA con AJAX):
# Backend: Generar token y enviarlo en cookie
from flask import jsonify, make_response
@app.route('/api/csrf-token')
def get_csrf_token():
token = secrets.token_urlsafe(32)
# Guardar en sesión
session['csrf_token'] = token
response = make_response(jsonify({'csrf_token': token}))
# Cookie con SameSite
response.set_cookie(
'csrf_token',
token,
httponly=False, # JS necesita leerla
secure=True,
samesite='Strict'
)
return response
@app.route('/api/transfer', methods=['POST'])
@login_required
def api_transfer():
# Validar token del header
token_from_header = request.headers.get('X-CSRF-Token')
token_from_session = session.get('csrf_token')
if not token_from_header or token_from_header != token_from_session:
return jsonify({'error': 'CSRF token mismatch'}), 403
# Procesar transferencia
data = request.json
execute_transfer(current_user, data['to'], data['amount'])
return jsonify({'status': 'success'})
// Frontend: Incluir token en requests
async function transfer(to, amount) {
// Obtener token
const tokenResponse = await fetch('/api/csrf-token');
const { csrf_token } = await tokenResponse.json();
// Incluir en header
const response = await fetch('/api/transfer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrf_token
},
body: JSON.stringify({ to, amount })
});
return response.json();
}
SameSite Cookie Attribute:
# Protección adicional contra CSRF
@app.after_request
def set_cookie_samesite(response):
"""Configurar SameSite en cookies"""
for cookie in response.headers.getlist('Set-Cookie'):
if 'SameSite' not in cookie:
response.headers.add('Set-Cookie', cookie + '; SameSite=Lax')
return response
# Opciones SameSite:
# - Strict: Cookie nunca se envía en requests cross-site (más seguro)
# - Lax: Cookie se envía en navegación top-level (balance)
# - None: Cookie se envía siempre (requiere Secure flag)
response.set_cookie(
'session',
session_id,
httponly=True,
secure=True,
samesite='Strict'
)
Receta 4.5: Authentication & Authorization
A7: Identification and Authentication Failures
Autenticación Segura
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
from datetime import datetime, timedelta
import jwt
class AuthManager:
"""Gestión de autenticación segura"""
SECRET_KEY = 'your-secret-key' # En producción: variable de entorno
@staticmethod
def hash_password(password: str) -> str:
"""Hash password con salt"""
# Usar bcrypt o argon2 en producción
return generate_password_hash(password, method='pbkdf2:sha256', salt_length=16)
@staticmethod
def verify_password(password: str, password_hash: str) -> bool:
"""Verificar password"""
return check_password_hash(password_hash, password)
@staticmethod
def validate_password_strength(password: str) -> tuple[bool, str]:
"""Validar fortaleza del password"""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not any(c.isupper() for c in password):
return False, "Password must contain uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain digit"
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in password):
return False, "Password must contain special character"
# Verificar contra contraseñas comunes
common_passwords = ['password123', 'qwerty123', '123456789']
if password.lower() in common_passwords:
return False, "Password is too common"
return True, "Password is strong"
@staticmethod
def generate_jwt(user_id: str, expires_in: int = 3600) -> str:
"""Generar JWT token"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iat': datetime.utcnow()
}
return jwt.encode(payload, AuthManager.SECRET_KEY, algorithm='HS256')
@staticmethod
def verify_jwt(token: str) -> dict:
"""Verificar JWT token"""
try:
payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=['HS256'])
return {'valid': True, 'user_id': payload['user_id']}
except jwt.ExpiredSignatureError:
return {'valid': False, 'error': 'Token expired'}
except jwt.InvalidTokenError:
return {'valid': False, 'error': 'Invalid token'}
# Uso
@app.route('/register', methods=['POST'])
def register():
data = request.json
password = data.get('password')
# Validar fortaleza
is_valid, message = AuthManager.validate_password_strength(password)
if not is_valid:
return jsonify({'error': message}), 400
# Hash password
password_hash = AuthManager.hash_password(password)
# Crear usuario
user = db.users.insert_one({
'email': data['email'],
'password': password_hash,
'created_at': datetime.utcnow()
})
return jsonify({'status': 'success', 'user_id': str(user.inserted_id)})
@app.route('/login', methods=['POST'])
def login():
data = request.json
# Rate limiting (prevenir brute force)
if is_rate_limited(request.remote_addr):
return jsonify({'error': 'Too many attempts'}), 429
# Buscar usuario
user = db.users.find_one({'email': data['email']})
if not user or not AuthManager.verify_password(data['password'], user['password']):
# Registrar intento fallido
log_failed_login(request.remote_addr, data['email'])
# Mensaje genérico (no revelar si usuario existe)
return jsonify({'error': 'Invalid credentials'}), 401
# Generar token
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
Multi-Factor Authentication (MFA)
import pyotp
import qrcode
from io import BytesIO
import base64
class MFAManager:
"""Gestión de MFA (TOTP)"""
@staticmethod
def generate_secret() -> str:
"""Generar secret para TOTP"""
return pyotp.random_base32()
@staticmethod
def generate_qr_code(secret: str, user_email: str, issuer: str = 'MyApp') -> str:
"""Generar QR code para Google Authenticator"""
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_email,
issuer_name=issuer
)
# Generar QR
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convertir a base64
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
@staticmethod
def verify_totp(secret: str, code: str) -> bool:
"""Verificar código TOTP"""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # Acepta 1 código antes/después
# Endpoints
@app.route('/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
"""Configurar MFA para usuario"""
user_id = current_user['_id']
# Generar secret
secret = MFAManager.generate_secret()
# Guardar secret (encriptado en producción)
db.users.update_one(
{'_id': user_id},
{'$set': {'mfa_secret': secret, 'mfa_enabled': False}}
)
# Generar QR
qr_code = MFAManager.generate_qr_code(secret, current_user['email'])
return jsonify({
'qr_code': qr_code,
'secret': secret # Para manual entry
})
@app.route('/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
"""Verificar y activar MFA"""
code = request.json.get('code')
user = db.users.find_one({'_id': current_user['_id']})
if MFAManager.verify_totp(user['mfa_secret'], code):
# Activar MFA
db.users.update_one(
{'_id': current_user['_id']},
{'$set': {'mfa_enabled': True}}
)
# Generar backup codes
backup_codes = [secrets.token_hex(8) for _ in range(10)]
db.users.update_one(
{'_id': current_user['_id']},
{'$set': {'backup_codes': [hash_code(c) for c in backup_codes]}}
)
return jsonify({
'status': 'success',
'backup_codes': backup_codes # Mostrar UNA VEZ
})
return jsonify({'error': 'Invalid code'}), 400
@app.route('/login/mfa', methods=['POST'])
def login_with_mfa():
"""Login con MFA"""
data = request.json
temp_token = data.get('temp_token')
mfa_code = data.get('mfa_code')
# Verificar temp token
payload = verify_temp_token(temp_token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
user = db.users.find_one({'_id': payload['user_id']})
# Verificar código MFA
if MFAManager.verify_totp(user['mfa_secret'], mfa_code):
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
# O verificar backup code
if verify_backup_code(user, mfa_code):
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
return jsonify({'error': 'Invalid MFA code'}), 401
Role-Based Access Control (RBAC)
from functools import wraps
from flask import g
class Role:
ADMIN = 'admin'
MODERATOR = 'moderator'
USER = 'user'
GUEST = 'guest'
class Permission:
READ = 'read'
WRITE = 'write'
DELETE = 'delete'
ADMIN = 'admin'
# Matriz de permisos por rol
ROLE_PERMISSIONS = {
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.USER: [Permission.READ, Permission.WRITE],
Role.GUEST: [Permission.READ]
}
def require_permission(permission: str):
"""Decorator para requerir permiso"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': 'Not authenticated'}), 401
user_role = g.user.get('role', Role.GUEST)
user_permissions = ROLE_PERMISSIONS.get(user_role, [])
if permission not in user_permissions:
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def require_role(role: str):
"""Decorator para requerir rol específico"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': 'Not authenticated'}), 401
if g.user.get('role') != role:
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# Uso
@app.route('/admin/users')
@require_role(Role.ADMIN)
def list_users():
"""Solo admins pueden listar usuarios"""
users = list(db.users.find({}, {'password': 0}))
return jsonify(users)
@app.route('/posts/<post_id>', methods=['DELETE'])
@require_permission(Permission.DELETE)
def delete_post(post_id):
"""Requiere permiso de DELETE (admins y moderadores)"""
post = db.posts.find_one({'_id': post_id})
# Verificar ownership (usuarios pueden borrar sus propios posts)
if g.user['role'] == Role.USER and post['author_id'] != g.user['_id']:
return jsonify({'error': 'Can only delete own posts'}), 403
db.posts.delete_one({'_id': post_id})
return jsonify({'status': 'success'})
Receta 4.6: Encriptación y Gestión de Secretos
Encriptación en tránsito vs en reposo:
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os
class EncryptionManager:
"""Gestión de encriptación"""
@staticmethod
def generate_key() -> bytes:
"""Generar key para Fernet"""
return Fernet.generate_key()
@staticmethod
def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""Derivar key desde password usando PBKDF2"""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
@staticmethod
def encrypt(data: str, key: bytes) -> str:
"""Encriptar data"""
f = Fernet(key)
encrypted = f.encrypt(data.encode())
return encrypted.decode()
@staticmethod
def decrypt(encrypted_data: str, key: bytes) -> str:
"""Desencriptar data"""
f = Fernet(key)
decrypted = f.decrypt(encrypted_data.encode())
return decrypted.decode()
# Ejemplo: Encriptar datos sensibles en DB
class UserSecrets:
"""Gestión de datos sensibles de usuario"""
def __init__(self, encryption_key: bytes):
self.encryption_manager = EncryptionManager()
self.key = encryption_key
def store_credit_card(self, user_id: str, card_number: str, cvv: str):
"""Guardar tarjeta encriptada"""
encrypted_card = self.encryption_manager.encrypt(card_number, self.key)
encrypted_cvv = self.encryption_manager.encrypt(cvv, self.key)
# Guardar (solo últimos 4 dígitos en plain text para UI)
db.user_payment_methods.insert_one({
'user_id': user_id,
'card_number_encrypted': encrypted_card,
'cvv_encrypted': encrypted_cvv,
'last_four': card_number[-4:],
'created_at': datetime.utcnow()
})
def get_credit_card(self, user_id: str) -> dict:
"""Obtener tarjeta desencriptada"""
card = db.user_payment_methods.find_one({'user_id': user_id})
if not card:
return None
card_number = self.encryption_manager.decrypt(
card['card_number_encrypted'],
self.key
)
return {
'card_number': card_number,
'last_four': card['last_four']
}
Gestión de Secretos:
# ❌ NUNCA hacer esto
API_KEY = "sk_live_abc123" # Hardcoded en código
DATABASE_PASSWORD = "mypassword123"
# ✅ BIEN: Variables de entorno
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv('API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')
# ✅ MEJOR: Gestores de secretos (AWS Secrets Manager, HashiCorp Vault)
import boto3
import json
class SecretsManager:
"""Wrapper para AWS Secrets Manager"""
def __init__(self, region_name='us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region_name)
self._cache = {}
def get_secret(self, secret_name: str) -> dict:
"""Obtener secreto (con cache)"""
if secret_name in self._cache:
return self._cache[secret_name]
try:
response = self.client.get_secret_value(SecretId=secret_name)
secret = json.loads(response['SecretString'])
self._cache[secret_name] = secret
return secret
except Exception as e:
print(f"Error retrieving secret: {e}")
raise
def rotate_secret(self, secret_name: str, new_value: dict):
"""Rotar secreto"""
self.client.update_secret(
SecretId=secret_name,
SecretString=json.dumps(new_value)
)
# Invalidar cache
self._cache.pop(secret_name, None)
# Uso
secrets = SecretsManager()
db_creds = secrets.get_secret('myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/{db_creds['database']}"
api_keys = secrets.get_secret('myapp/api-keys')
STRIPE_KEY = api_keys['stripe']
SENDGRID_KEY = api_keys['sendgrid']
Terraform - Gestión de secretos:
# secrets.tf
# Generar password random
resource "random_password" "db_password" {
length = 32
special = true
}
# Guardar en AWS Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
name = "myapp/database"
recovery_window_in_days = 7
}
resource "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = aws_secretsmanager_secret.db_credentials.id
secret_string = jsonencode({
username = "postgres"
password = random_password.db_password.result
host = aws_db_instance.main.endpoint
database = "myapp"
})
}
# IAM policy para acceso
resource "aws_iam_role_policy" "secrets_access" {
role = aws_iam_role.ecs_task.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"secretsmanager:GetSecretValue"
]
Resource = [
aws_secretsmanager_secret.db_credentials.arn
]
}
]
})
}
# ECS Task con secretos
resource "aws_ecs_task_definition" "api" {
family = "myapp-api"
container_definitions = jsonencode([
{
name = "api"
image = "myapp:latest"
secrets = [
{
name = "DATABASE_URL"
valueFrom = aws_secretsmanager_secret.db_credentials.arn
}
]
}
])
}
Versión: 1.0 Fecha: 2024 Autor: Roadmap del Desarrollador del Futuro Licencia: Uso educativo