🔒 Developer Cookbook - FASE 4: Seguridad y Observabilidad
Recetas prácticas para construir aplicaciones seguras, monitoreadas y observables
📚 Tabla de Contenidos
Ciberseguridad para Desarrolladores
Receta 4.1: OWASP Top 10 - Vulnerabilidades Críticas
¿Qué es OWASP Top 10? Lista de las 10 vulnerabilidades de seguridad más críticas en aplicaciones web, actualizada cada 3-4 años.
OWASP Top 10 (2021):
| # | Vulnerabilidad | Impacto | Prevalencia |
|---|---|---|---|
| 1 | Broken Access Control | 🔴 Alto | 🔴 Muy común |
| 2 | Cryptographic Failures | 🔴 Alto | 🟡 Común |
| 3 | Injection | 🔴 Alto | 🟡 Común |
| 4 | Insecure Design | 🟡 Medio | 🟢 Moderado |
| 5 | Security Misconfiguration | 🟡 Medio | 🔴 Muy común |
| 6 | Vulnerable Components | 🟡 Medio | 🔴 Muy común |
| 7 | Identification/Auth Failures | 🔴 Alto | 🟡 Común |
| 8 | Software/Data Integrity | 🔴 Alto | 🟢 Moderado |
| 9 | Security Logging Failures | 🟡 Medio | 🟡 Común |
| 10 | Server-Side Request Forgery | 🟡 Medio | 🟢 Moderado |
Receta 4.2: Injection Attacks - SQL, NoSQL, Command
A3: Injection
SQL Injection
❌ VULNERABLE:
# Flask/Python - NUNCA hacer esto
@app.route('/user/<user_id>')
def get_user(user_id):
# SQL Injection vulnerable
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
# Ataque: /user/1 OR 1=1
# Query: SELECT * FROM users WHERE id = 1 OR 1=1
# Retorna TODOS los usuarios
✅ SEGURO - Prepared Statements:
# ✅ BIEN: Usar parameterized queries
@app.route('/user/<user_id>')
def get_user(user_id):
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
# O con ORM (SQLAlchemy)
user = db.session.query(User).filter(User.id == user_id).first()
return jsonify(user.to_dict())
# ✅ BIEN: Con validación adicional
@app.route('/user/<int:user_id>') # Type hint valida que sea int
def get_user(user_id):
if not isinstance(user_id, int) or user_id < 1:
return jsonify({'error': 'Invalid user ID'}), 400
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
Casos comunes de SQL Injection:
# 1. Login bypass
# ❌ VULNERABLE
username = request.form['username']
password = request.form['password']
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
# Ataque: username = admin'--
# Query: SELECT * FROM users WHERE username='admin'-- AND password='...'
# Comenta el resto, bypass de password!
# ✅ SEGURO
query = "SELECT * FROM users WHERE username=%s AND password=%s"
cursor.execute(query, (username, hashed_password))
# 2. UNION-based injection
# ❌ VULNERABLE
search = request.args.get('q')
query = f"SELECT title, content FROM articles WHERE title LIKE '%{search}%'"
# Ataque: q=' UNION SELECT username, password FROM users--
# Extrae credenciales!
# ✅ SEGURO
query = "SELECT title, content FROM articles WHERE title LIKE %s"
cursor.execute(query, (f'%{search}%',))
# 3. Blind SQL Injection
# ❌ VULNERABLE
product_id = request.args.get('id')
query = f"SELECT * FROM products WHERE id={product_id}"
# Ataque: id=1 AND (SELECT SLEEP(5))
# Si tarda 5 segundos, vulnerable!
# ✅ SEGURO: Prepared statements + validación
if not product_id.isdigit():
return jsonify({'error': 'Invalid ID'}), 400
query = "SELECT * FROM products WHERE id=%s"
cursor.execute(query, (product_id,))
NoSQL Injection (MongoDB)
# ❌ VULNERABLE
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
# NoSQL Injection vulnerable
user = db.users.find_one({
'username': username,
'password': password
})
# Ataque JSON:
# {"username": {"$ne": null}, "password": {"$ne": null}}
# Retorna el primer usuario (admin bypass)
# ✅ SEGURO
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
# Validar tipos
if not isinstance(username, str) or not isinstance(password, str):
return jsonify({'error': 'Invalid credentials'}), 400
# Sanitizar input (eliminar operadores)
username = username.replace('$', '')
# Usar operadores explícitos
user = db.users.find_one({
'username': {'$eq': username}
})
if user and check_password_hash(user['password'], password):
return jsonify({'token': generate_token(user['_id'])})
return jsonify({'error': 'Invalid credentials'}), 401
Command Injection
# ❌ VULNERABLE
import subprocess
@app.route('/ping')
def ping():
host = request.args.get('host')
# Command injection vulnerable
result = subprocess.run(f'ping -c 4 {host}', shell=True, capture_output=True)
# Ataque: host=google.com; cat /etc/passwd
# Ejecuta: ping -c 4 google.com; cat /etc/passwd
# ✅ SEGURO
import subprocess
import re
@app.route('/ping')
def ping():
host = request.args.get('host')
# 1. Validar formato (solo dominios/IPs válidos)
if not re.match(r'^[a-zA-Z0-9.-]+$', host):
return jsonify({'error': 'Invalid host'}), 400
# 2. NO usar shell=True
# 3. Pasar comando como lista
try:
result = subprocess.run(
['ping', '-c', '4', host],
capture_output=True,
text=True,
timeout=10
)
return jsonify({'output': result.stdout})
except subprocess.TimeoutExpired:
return jsonify({'error': 'Timeout'}), 408
Prevención general:
class InputValidator:
"""Validador de inputs"""
@staticmethod
def sanitize_string(input_str: str, max_length: int = 255) -> str:
"""Sanitizar string general"""
if not isinstance(input_str, str):
raise ValueError("Input must be string")
# Truncar
sanitized = input_str[:max_length]
# Eliminar caracteres peligrosos
dangerous_chars = ['<', '>', '&', '"', "'", ';', '|', '`', '$']
for char in dangerous_chars:
sanitized = sanitized.replace(char, '')
return sanitized.strip()
@staticmethod
def validate_email(email: str) -> bool:
"""Validar email"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def validate_username(username: str) -> bool:
"""Validar username (alfanumérico + guiones)"""
import re
return bool(re.match(r'^[a-zA-Z0-9_-]{3,20}$', username))
@staticmethod
def validate_sql_safe(value: str) -> bool:
"""Verificar que no contenga SQL keywords"""
sql_keywords = [
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
'ALTER', 'UNION', 'OR', 'AND', '--', '/*', '*/', 'xp_'
]
upper_value = value.upper()
return not any(keyword in upper_value for keyword in sql_keywords)
# Uso
validator = InputValidator()
@app.route('/register', methods=['POST'])
def register():
try:
username = validator.sanitize_string(request.json.get('username'))
email = request.json.get('email')
if not validator.validate_username(username):
return jsonify({'error': 'Invalid username'}), 400
if not validator.validate_email(email):
return jsonify({'error': 'Invalid email'}), 400
# Continuar con registro...
except ValueError as e:
return jsonify({'error': str(e)}), 400
Receta 4.3: XSS (Cross-Site Scripting)
¿Qué es XSS? Inyección de código JavaScript malicioso en páginas web vistas por otros usuarios.
Tipos de XSS:
1. Reflected XSS (No persistente)
# ❌ VULNERABLE
@app.route('/search')
def search():
query = request.args.get('q', '')
# XSS vulnerable - renderiza input sin escape
return f"""
<html>
<body>
<h1>Resultados para: {query}</h1>
</body>
</html>
"""
# Ataque: /search?q=<script>alert('XSS')</script>
# Se ejecuta el JavaScript!
# ✅ SEGURO
from markupsafe import escape
@app.route('/search')
def search():
query = request.args.get('q', '')
# Escapar HTML entities
safe_query = escape(query)
return f"""
<html>
<body>
<h1>Resultados para: {safe_query}</h1>
</body>
</html>
"""
# O mejor: usar template engine con auto-escape
return render_template('search.html', query=query)
<!-- templates/search.html (Jinja2 auto-escapa por default) -->
<!DOCTYPE html>
<html>
<body>
<h1>Resultados para: {{ query }}</h1>
<!-- Si NECESITAS HTML sin escape (PELIGROSO) -->
<!-- <div>{{ content | safe }}</div> -->
</body>
</html>
2. Stored XSS (Persistente)
# ❌ VULNERABLE
@app.route('/comment', methods=['POST'])
def add_comment():
comment = request.json.get('comment')
# Guardar sin sanitizar
db.comments.insert_one({
'text': comment,
'created_at': datetime.now()
})
return jsonify({'status': 'success'})
@app.route('/comments')
def get_comments():
comments = list(db.comments.find())
# Renderizar sin escape
html = '<div>'
for comment in comments:
html += f"<p>{comment['text']}</p>"
html += '</div>'
return html
# Ataque: comment = "<img src=x onerror='alert(document.cookie)'>"
# Se ejecuta en TODOS los usuarios que vean los comentarios!
# ✅ SEGURO
import bleach
@app.route('/comment', methods=['POST'])
def add_comment():
comment = request.json.get('comment')
# Sanitizar HTML permitiendo solo tags seguros
clean_comment = bleach.clean(
comment,
tags=['b', 'i', 'u', 'p', 'br'], # Tags permitidos
attributes={}, # Sin atributos
strip=True
)
db.comments.insert_one({
'text': clean_comment,
'created_at': datetime.now()
})
return jsonify({'status': 'success'})
@app.route('/comments')
def get_comments():
comments = list(db.comments.find())
# Renderizar con auto-escape
return render_template('comments.html', comments=comments)
3. DOM-based XSS (Client-side)
<!-- ❌ VULNERABLE -->
<script>
function displayName() {
// Lee de URL
const name = new URLSearchParams(window.location.search).get('name');
// Inserta en DOM sin sanitizar
document.getElementById('greeting').innerHTML = 'Hello, ' + name;
// Ataque: ?name=<img src=x onerror='alert(1)'>
}
</script>
<!-- ✅ SEGURO -->
<script>
function displayName() {
const name = new URLSearchParams(window.location.search).get('name');
// Usar textContent en vez de innerHTML
document.getElementById('greeting').textContent = 'Hello, ' + name;
// O sanitizar con DOMPurify
const cleanName = DOMPurify.sanitize(name);
document.getElementById('greeting').innerHTML = 'Hello, ' + cleanName;
}
</script>
Content Security Policy (CSP):
from flask import Flask, make_response
app = Flask(__name__)
@app.after_request
def set_csp(response):
"""Configurar Content Security Policy"""
csp = {
"default-src": "'self'",
"script-src": "'self' 'unsafe-inline' https://cdn.example.com",
"style-src": "'self' 'unsafe-inline'",
"img-src": "'self' data: https:",
"font-src": "'self' https://fonts.gstatic.com",
"connect-src": "'self' https://api.example.com",
"frame-ancestors": "'none'",
"base-uri": "'self'",
"form-action": "'self'"
}
csp_string = '; '.join([f"{k} {v}" for k, v in csp.items()])
response.headers['Content-Security-Policy'] = csp_string
# Otros headers de seguridad
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
Receta 4.4: CSRF (Cross-Site Request Forgery)
¿Qué es CSRF? Atacante engaña a usuario autenticado para ejecutar acciones no deseadas.
Escenario del ataque:
<!-- Sitio malicioso: evil.com -->
<html>
<body>
<h1>¡Ganaste un premio!</h1>
<!-- Formulario oculto que ejecuta acción en bank.com -->
<form id="evil" action="https://bank.com/transfer" method="POST">
<input type="hidden" name="to" value="attacker">
<input type="hidden" name="amount" value="10000">
</form>
<script>
// Auto-submit cuando usuario visita la página
document.getElementById('evil').submit();
</script>
</body>
</html>
<!-- Si usuario está autenticado en bank.com, la transferencia se ejecuta! -->
❌ VULNERABLE:
@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
# Sin protección CSRF
to = request.form.get('to')
amount = request.form.get('amount')
# Ejecuta la transferencia
execute_transfer(current_user, to, amount)
return jsonify({'status': 'success'})
✅ SEGURO - CSRF Tokens:
from flask_wtf.csrf import CSRFProtect
import secrets
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
csrf = CSRFProtect(app)
@app.route('/transfer', methods=['POST'])
@login_required
def transfer():
# Flask-WTF verifica CSRF token automáticamente
to = request.form.get('to')
amount = request.form.get('amount')
execute_transfer(current_user, to, amount)
return jsonify({'status': 'success'})
<!-- Template con CSRF token -->
<form method="POST" action="/transfer">
<!-- Token CSRF -->
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="text" name="to" placeholder="Destinatario">
<input type="number" name="amount" placeholder="Monto">
<button type="submit">Transferir</button>
</form>
Para APIs (SPA con AJAX):
# Backend: Generar token y enviarlo en cookie
from flask import jsonify, make_response
@app.route('/api/csrf-token')
def get_csrf_token():
token = secrets.token_urlsafe(32)
# Guardar en sesión
session['csrf_token'] = token
response = make_response(jsonify({'csrf_token': token}))
# Cookie con SameSite
response.set_cookie(
'csrf_token',
token,
httponly=False, # JS necesita leerla
secure=True,
samesite='Strict'
)
return response
@app.route('/api/transfer', methods=['POST'])
@login_required
def api_transfer():
# Validar token del header
token_from_header = request.headers.get('X-CSRF-Token')
token_from_session = session.get('csrf_token')
if not token_from_header or token_from_header != token_from_session:
return jsonify({'error': 'CSRF token mismatch'}), 403
# Procesar transferencia
data = request.json
execute_transfer(current_user, data['to'], data['amount'])
return jsonify({'status': 'success'})
// Frontend: Incluir token en requests
async function transfer(to, amount) {
// Obtener token
const tokenResponse = await fetch('/api/csrf-token');
const { csrf_token } = await tokenResponse.json();
// Incluir en header
const response = await fetch('/api/transfer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrf_token
},
body: JSON.stringify({ to, amount })
});
return response.json();
}
SameSite Cookie Attribute:
# Protección adicional contra CSRF
@app.after_request
def set_cookie_samesite(response):
"""Configurar SameSite en cookies"""
for cookie in response.headers.getlist('Set-Cookie'):
if 'SameSite' not in cookie:
response.headers.add('Set-Cookie', cookie + '; SameSite=Lax')
return response
# Opciones SameSite:
# - Strict: Cookie nunca se envía en requests cross-site (más seguro)
# - Lax: Cookie se envía en navegación top-level (balance)
# - None: Cookie se envía siempre (requiere Secure flag)
response.set_cookie(
'session',
session_id,
httponly=True,
secure=True,
samesite='Strict'
)
Receta 4.5: Authentication & Authorization
A7: Identification and Authentication Failures
Autenticación Segura
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
from datetime import datetime, timedelta
import jwt
class AuthManager:
"""Gestión de autenticación segura"""
SECRET_KEY = 'your-secret-key' # En producción: variable de entorno
@staticmethod
def hash_password(password: str) -> str:
"""Hash password con salt"""
# Usar bcrypt o argon2 en producción
return generate_password_hash(password, method='pbkdf2:sha256', salt_length=16)
@staticmethod
def verify_password(password: str, password_hash: str) -> bool:
"""Verificar password"""
return check_password_hash(password_hash, password)
@staticmethod
def validate_password_strength(password: str) -> tuple[bool, str]:
"""Validar fortaleza del password"""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not any(c.isupper() for c in password):
return False, "Password must contain uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain digit"
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in password):
return False, "Password must contain special character"
# Verificar contra contraseñas comunes
common_passwords = ['password123', 'qwerty123', '123456789']
if password.lower() in common_passwords:
return False, "Password is too common"
return True, "Password is strong"
@staticmethod
def generate_jwt(user_id: str, expires_in: int = 3600) -> str:
"""Generar JWT token"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iat': datetime.utcnow()
}
return jwt.encode(payload, AuthManager.SECRET_KEY, algorithm='HS256')
@staticmethod
def verify_jwt(token: str) -> dict:
"""Verificar JWT token"""
try:
payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=['HS256'])
return {'valid': True, 'user_id': payload['user_id']}
except jwt.ExpiredSignatureError:
return {'valid': False, 'error': 'Token expired'}
except jwt.InvalidTokenError:
return {'valid': False, 'error': 'Invalid token'}
# Uso
@app.route('/register', methods=['POST'])
def register():
data = request.json
password = data.get('password')
# Validar fortaleza
is_valid, message = AuthManager.validate_password_strength(password)
if not is_valid:
return jsonify({'error': message}), 400
# Hash password
password_hash = AuthManager.hash_password(password)
# Crear usuario
user = db.users.insert_one({
'email': data['email'],
'password': password_hash,
'created_at': datetime.utcnow()
})
return jsonify({'status': 'success', 'user_id': str(user.inserted_id)})
@app.route('/login', methods=['POST'])
def login():
data = request.json
# Rate limiting (prevenir brute force)
if is_rate_limited(request.remote_addr):
return jsonify({'error': 'Too many attempts'}), 429
# Buscar usuario
user = db.users.find_one({'email': data['email']})
if not user or not AuthManager.verify_password(data['password'], user['password']):
# Registrar intento fallido
log_failed_login(request.remote_addr, data['email'])
# Mensaje genérico (no revelar si usuario existe)
return jsonify({'error': 'Invalid credentials'}), 401
# Generar token
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
Multi-Factor Authentication (MFA)
import pyotp
import qrcode
from io import BytesIO
import base64
class MFAManager:
"""Gestión de MFA (TOTP)"""
@staticmethod
def generate_secret() -> str:
"""Generar secret para TOTP"""
return pyotp.random_base32()
@staticmethod
def generate_qr_code(secret: str, user_email: str, issuer: str = 'MyApp') -> str:
"""Generar QR code para Google Authenticator"""
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_email,
issuer_name=issuer
)
# Generar QR
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convertir a base64
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
@staticmethod
def verify_totp(secret: str, code: str) -> bool:
"""Verificar código TOTP"""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # Acepta 1 código antes/después
# Endpoints
@app.route('/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
"""Configurar MFA para usuario"""
user_id = current_user['_id']
# Generar secret
secret = MFAManager.generate_secret()
# Guardar secret (encriptado en producción)
db.users.update_one(
{'_id': user_id},
{'$set': {'mfa_secret': secret, 'mfa_enabled': False}}
)
# Generar QR
qr_code = MFAManager.generate_qr_code(secret, current_user['email'])
return jsonify({
'qr_code': qr_code,
'secret': secret # Para manual entry
})
@app.route('/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
"""Verificar y activar MFA"""
code = request.json.get('code')
user = db.users.find_one({'_id': current_user['_id']})
if MFAManager.verify_totp(user['mfa_secret'], code):
# Activar MFA
db.users.update_one(
{'_id': current_user['_id']},
{'$set': {'mfa_enabled': True}}
)
# Generar backup codes
backup_codes = [secrets.token_hex(8) for _ in range(10)]
db.users.update_one(
{'_id': current_user['_id']},
{'$set': {'backup_codes': [hash_code(c) for c in backup_codes]}}
)
return jsonify({
'status': 'success',
'backup_codes': backup_codes # Mostrar UNA VEZ
})
return jsonify({'error': 'Invalid code'}), 400
@app.route('/login/mfa', methods=['POST'])
def login_with_mfa():
"""Login con MFA"""
# Paso 1: Verificar email/password (devolver temp token)
# Paso 2: Verificar código MFA con temp token
data = request.json
temp_token = data.get('temp_token')
mfa_code = data.get('mfa_code')
# Verificar temp token
payload = verify_temp_token(temp_token)
if not payload:
return jsonify({'error': 'Invalid token'}), 401
user = db.users.find_one({'_id': payload['user_id']})
# Verificar código MFA
if MFAManager.verify_totp(user['mfa_secret'], mfa_code):
# Generar token final
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
# O verificar backup code
if verify_backup_code(user, mfa_code):
token = AuthManager.generate_jwt(str(user['_id']))
return jsonify({'token': token})
return jsonify({'error': 'Invalid MFA code'}), 401
Role-Based Access Control (RBAC)
from functools import wraps
from flask import g
class Role:
ADMIN = 'admin'
MODERATOR = 'moderator'
USER = 'user'
GUEST = 'guest'
class Permission:
READ = 'read'
WRITE = 'write'
DELETE = 'delete'
ADMIN = 'admin'
# Matriz de permisos por rol
ROLE_PERMISSIONS = {
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.USER: [Permission.READ, Permission.WRITE],
Role.GUEST: [Permission.READ]
}
def require_permission(permission: str):
"""Decorator para requerir permiso"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': 'Not authenticated'}), 401
user_role = g.user.get('role', Role.GUEST)
user_permissions = ROLE_PERMISSIONS.get(user_role, [])
if permission not in user_permissions:
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def require_role(role: str):
"""Decorator para requerir rol específico"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': 'Not authenticated'}), 401
if g.user.get('role') != role:
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# Uso
@app.route('/admin/users')
@require_role(Role.ADMIN)
def list_users():
"""Solo admins pueden listar usuarios"""
users = list(db.users.find({}, {'password': 0}))
return jsonify(users)
@app.route('/posts/<post_id>', methods=['DELETE'])
@require_permission(Permission.DELETE)
def delete_post(post_id):
"""Requiere permiso de DELETE (admins y moderadores)"""
post = db.posts.find_one({'_id': post_id})
# Verificar ownership (usuarios pueden borrar sus propios posts)
if g.user['role'] == Role.USER and post['author_id'] != g.user['_id']:
return jsonify({'error': 'Can only delete own posts'}), 403
db.posts.delete_one({'_id': post_id})
return jsonify({'status': 'success'})
Receta 4.6: Encriptación y Gestión de Secretos
Encriptación en tránsito vs en reposo:
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os
class EncryptionManager:
"""Gestión de encriptación"""
@staticmethod
def generate_key() -> bytes:
"""Generar key para Fernet"""
return Fernet.generate_key()
@staticmethod
def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""Derivar key desde password usando PBKDF2"""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
@staticmethod
def encrypt(data: str, key: bytes) -> str:
"""Encriptar data"""
f = Fernet(key)
encrypted = f.encrypt(data.encode())
return encrypted.decode()
@staticmethod
def decrypt(encrypted_data: str, key: bytes) -> str:
"""Desencriptar data"""
f = Fernet(key)
decrypted = f.decrypt(encrypted_data.encode())
return decrypted.decode()
# Ejemplo: Encriptar datos sensibles en DB
class UserSecrets:
"""Gestión de datos sensibles de usuario"""
def __init__(self, encryption_key: bytes):
self.encryption_manager = EncryptionManager()
self.key = encryption_key
def store_credit_card(self, user_id: str, card_number: str, cvv: str):
"""Guardar tarjeta encriptada"""
# Encriptar
encrypted_card = self.encryption_manager.encrypt(card_number, self.key)
encrypted_cvv = self.encryption_manager.encrypt(cvv, self.key)
# Guardar (solo últimos 4 dígitos en plain text para UI)
db.user_payment_methods.insert_one({
'user_id': user_id,
'card_number_encrypted': encrypted_card,
'cvv_encrypted': encrypted_cvv,
'last_four': card_number[-4:],
'created_at': datetime.utcnow()
})
def get_credit_card(self, user_id: str) -> dict:
"""Obtener tarjeta desencriptada"""
card = db.user_payment_methods.find_one({'user_id': user_id})
if not card:
return None
# Desencriptar
card_number = self.encryption_manager.decrypt(
card['card_number_encrypted'],
self.key
)
return {
'card_number': card_number,
'last_four': card['last_four']
}
# HTTPS/TLS Configuration (Nginx)
"""
# nginx.conf
server {
listen 443 ssl http2;
server_name myapp.com;
# Certificados SSL (Let's Encrypt)
ssl_certificate /etc/letsencrypt/live/myapp.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/myapp.com/privkey.pem;
# SSL Configuration (Mozilla Intermediate)
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers 'ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384';
ssl_prefer_server_ciphers off;
# HSTS
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
# OCSP Stapling
ssl_stapling on;
ssl_stapling_verify on;
location / {
proxy_pass http://app:3000;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name myapp.com;
return 301 https://$host$request_uri;
}
"""
Gestión de Secretos:
# ❌ NUNCA hacer esto
API_KEY = "sk_live_abc123" # Hardcoded en código
DATABASE_PASSWORD = "mypassword123"
# ✅ BIEN: Variables de entorno
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv('API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')
# ✅ MEJOR: Gestores de secretos (AWS Secrets Manager, HashiCorp Vault)
import boto3
import json
class SecretsManager:
"""Wrapper para AWS Secrets Manager"""
def __init__(self, region_name='us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region_name)
self._cache = {}
def get_secret(self, secret_name: str) -> dict:
"""Obtener secreto (con cache)"""
if secret_name in self._cache:
return self._cache[secret_name]
try:
response = self.client.get_secret_value(SecretId=secret_name)
secret = json.loads(response['SecretString'])
self._cache[secret_name] = secret
return secret
except Exception as e:
print(f"Error retrieving secret: {e}")
raise
def rotate_secret(self, secret_name: str, new_value: dict):
"""Rotar secreto"""
self.client.update_secret(
SecretId=secret_name,
SecretString=json.dumps(new_value)
)
# Invalidar cache
self._cache.pop(secret_name, None)
# Uso
secrets = SecretsManager()
# Obtener credenciales de DB
db_creds = secrets.get_secret('myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/{db_creds['database']}"
# API keys
api_keys = secrets.get_secret('myapp/api-keys')
STRIPE_KEY = api_keys['stripe']
SENDGRID_KEY = api_keys['sendgrid']
Terraform - Gestión de secretos:
# secrets.tf
# Generar password random
resource "random_password" "db_password" {
length = 32
special = true
}
# Guardar en AWS Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
name = "myapp/database"
recovery_window_in_days = 7
}
resource "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = aws_secretsmanager_secret.db_credentials.id
secret_string = jsonencode({
username = "postgres"
password = random_password.db_password.result
host = aws_db_instance.main.endpoint
database = "myapp"
})
}
# IAM policy para acceso
resource "aws_iam_role_policy" "secrets_access" {
role = aws_iam_role.ecs_task.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"secretsmanager:GetSecretValue"
]
Resource = [
aws_secretsmanager_secret.db_credentials.arn
]
}
]
})
}
# ECS Task con secretos
resource "aws_ecs_task_definition" "api" {
family = "myapp-api"
container_definitions = jsonencode([
{
name = "api"
image = "myapp:latest"
# Secretos desde Secrets Manager
secrets = [
{
name = "DATABASE_URL"
valueFrom = aws_secretsmanager_secret.db_credentials.arn
}
]
}
])
}
Code Review y Análisis Estático
Receta 4.7: Static Analysis - SonarQube y CodeQL
SonarQube - Configuración:
# sonar-project.properties
sonar.projectKey=myapp
sonar.projectName=My Application
sonar.projectVersion=1.0
# Paths
sonar.sources=src
sonar.tests=tests
sonar.python.coverage.reportPaths=coverage.xml
# Exclusions
sonar.exclusions=**/node_modules/**,**/vendor/**,**/*.test.js
# Coverage exclusions
sonar.coverage.exclusions=**/*.test.js,**/tests/**
# Quality Gate
sonar.qualitygate.wait=true
GitHub Actions con SonarQube:
# .github/workflows/sonarqube.yml
name: SonarQube Analysis
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
sonarqube:
name: SonarQube Scan
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Shallow clones deshabilitados para mejor análisis
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests with coverage
run: pytest --cov=src --cov-report=xml
- name: SonarQube Scan
uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
- name: Check Quality Gate
uses: sonarsource/sonarqube-quality-gate-action@master
timeout-minutes: 5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
CodeQL - Análisis de seguridad:
# .github/workflows/codeql.yml
name: CodeQL Analysis
on:
push:
branches: [main]
pull_request:
branches: [main]
schedule:
- cron: '0 0 * * 1' # Weekly scan
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: ['python', 'javascript']
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
queries: security-extended
- name: Autobuild
uses: github/codeql-action/autobuild@v2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
with:
category: "/language:${{ matrix.language }}"
Custom CodeQL Queries:
/**
* @name Hardcoded credentials
* @description Detects hardcoded passwords and API keys
* @kind problem
* @problem.severity error
* @id python/hardcoded-credentials
*/
import python
from StrConst str
where
(
str.getText().regexpMatch("(?i).*(password|passwd|pwd)\\s*=\\s*['\"][^'\"]+['\"].*") or
str.getText().regexpMatch("(?i).*(api_key|apikey|secret|token)\\s*=\\s*['\"][^'\"]+['\"].*")
)
and not str.getText().regexpMatch(".*=\\s*['\"]\\$\\{.*") // Excluir variables de entorno
select str, "Potential hardcoded credential found"
Receta 4.8: Dependency Scanning
Dependabot Configuration:
# .github/dependabot.yml
version: 2
updates:
# Python dependencies
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
reviewers:
- "security-team"
labels:
- "dependencies"
- "security"
# Ignorar patches menores de dev dependencies
ignore:
- dependency-name: "*"
update-types: ["version-update:semver-patch"]
# NPM dependencies
- package-ecosystem: "npm"
directory: "/frontend"
schedule:
interval: "weekly"
versioning-strategy: increase
# Docker
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
# GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
Snyk - Vulnerability Scanning:
# .github/workflows/snyk.yml
name: Snyk Security
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Snyk to check for vulnerabilities
uses: snyk/actions/python@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=high
- name: Upload result to GitHub Code Scanning
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: snyk.sarif
OWASP Dependency-Check:
# Instalar
wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.0.0/dependency-check-8.0.0-release.zip
unzip dependency-check-8.0.0-release.zip
# Ejecutar scan
./dependency-check/bin/dependency-check.sh \
--project "MyApp" \
--scan ./src \
--format HTML \
--format JSON \
--out ./reports
# Con npm audit
npm audit --json > npm-audit.json
# Fix automático
npm audit fix
npm audit fix --force # Acepta breaking changes
Safety (Python):
# Instalar
pip install safety
# Scan
safety check
# Con requirements.txt
safety check -r requirements.txt
# Output JSON
safety check --json > safety-report.json
# Ignorar vulnerabilidades específicas
safety check --ignore 12345
Observabilidad y Monitoreo
Receta 4.9: Los 3 Pilares - Logs, Métricas, Traces
1. LOGS - Structured Logging:
import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""Formatter personalizado para logs estructurados"""
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
# Agregar campos custom
log_record['timestamp'] = datetime.utcnow().isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
# Agregar contexto si existe
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_record['request_id'] = record.request_id
# Configurar logger
def setup_logging():
"""Configurar logging estructurado"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Handler para stdout (Docker/K8s lo captura)
handler = logging.StreamHandler()
# Formato JSON
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = setup_logging()
# Uso con contexto
class RequestContext:
"""Contexto de request para logging"""
def __init__(self):
self.request_id = None
self.user_id = None
def set_request_id(self, request_id):
self.request_id = request_id
def set_user_id(self, user_id):
self.user_id = user_id
context = RequestContext()
@app.before_request
def before_request():
"""Inicializar contexto de request"""
import uuid
context.set_request_id(str(uuid.uuid4()))
# Extraer user_id de JWT
token = request.headers.get('Authorization')
if token:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
context.set_user_id(payload['user_id'])
except:
pass
# Logging con contexto
@app.route('/api/orders', methods=['POST'])
def create_order():
logger.info(
'Creating order',
extra={
'request_id': context.request_id,
'user_id': context.user_id,
'endpoint': '/api/orders',
'method': 'POST'
}
)
try:
order = process_order(request.json)
logger.info(
'Order created successfully',
extra={
'request_id': context.request_id,
'user_id': context.user_id,
'order_id': order['id'],
'total': order['total']
}
)
return jsonify(order), 201
except Exception as e:
logger.error(
'Failed to create order',
extra={
'request_id': context.request_id,
'user_id': context.user_id,
'error': str(e),
'error_type': type(e).__name__
},
exc_info=True # Include stack trace
)
return jsonify({'error': 'Internal error'}), 500
# Log levels apropiados
logger.debug('Detailed info for debugging') # Solo en dev
logger.info('Normal operation') # Eventos importantes
logger.warning('Something unexpected') # Potencial problema
logger.error('Error occurred') # Error que afecta funcionalidad
logger.critical('System unstable') # Requiere atención inmediata
2. MÉTRICAS - Prometheus:
from prometheus_client import Counter, Histogram, Gauge, Summary, generate_latest
from prometheus_client import REGISTRY
from flask import Response
import time
# Definir métricas
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
active_users = Gauge(
'active_users',
'Number of active users'
)
order_value = Summary(
'order_value_dollars',
'Order value in dollars'
)
# Middleware para instrumentar requests
@app.before_request
def before_request_metrics():
request.start_time = time.time()
@app.after_request
def after_request_metrics(response):
# Incrementar contador
http_requests_total.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=response.status_code
).inc()
# Registrar duración
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
http_request_duration_seconds.labels(
method=request.method,
endpoint=request.endpoint or 'unknown'
).observe(duration)
return response
# Endpoint de métricas
@app.route('/metrics')
def metrics():
"""Endpoint para Prometheus scraping"""
return Response(generate_latest(REGISTRY), mimetype='text/plain')
# Métricas de negocio
@app.route('/api/orders', methods=['POST'])
def create_order():
order = process_order(request.json)
# Registrar valor de orden
order_value.observe(order['total'])
return jsonify(order)
# Métricas custom
class BusinessMetrics:
"""Métricas de negocio"""
def __init__(self):
self.revenue = Counter(
'revenue_total_dollars',
'Total revenue',
['product_category']
)
self.cart_size = Histogram(
'cart_size_items',
'Items per cart',
buckets=[1, 5, 10, 20, 50]
)
self.checkout_duration = Histogram(
'checkout_duration_seconds',
'Time to complete checkout',
buckets=[1, 5, 10, 30, 60, 120]
)
def record_sale(self, category: str, amount: float):
"""Registrar venta"""
self.revenue.labels(product_category=category).inc(amount)
def record_cart(self, items: int):
"""Registrar tamaño de carrito"""
self.cart_size.observe(items)
def record_checkout(self, duration: float):
"""Registrar tiempo de checkout"""
self.checkout_duration.observe(duration)
business_metrics = BusinessMetrics()
Prometheus Configuration:
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
# API metrics
- job_name: 'myapp-api'
static_configs:
- targets: ['api:3000']
metrics_path: '/metrics'
# Kubernetes pods
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
# Alert rules
rule_files:
- 'alerts.yml'
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
Alert Rules:
# alerts.yml
groups:
- name: api_alerts
interval: 30s
rules:
# Alta tasa de errores
- alert: HighErrorRate
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
# Latencia alta
- alert: HighLatency
expr: |
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }}s"
# Pocos recursos
- alert: HighMemoryUsage
expr: |
(
node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes
) / node_memory_MemTotal_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value | humanizePercentage }}"
3. TRACES - OpenTelemetry:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
# Configurar tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configurar exporter (Jaeger)
jaeger_exporter = JaegerExporter(
agent_host_name='jaeger',
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Auto-instrumentar Flask
FlaskInstrumentor().instrument_app(app)
# Auto-instrumentar requests HTTP
RequestsInstrumentor().instrument()
# Auto-instrumentar SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=db.engine)
# Traces manuales
@app.route('/api/orders/<order_id>')
def get_order(order_id):
with tracer.start_as_current_span('get_order') as span:
# Agregar atributos al span
span.set_attribute('order.id', order_id)
span.set_attribute('user.id', g.user['id'])
# Buscar orden
with tracer.start_as_current_span('database.query'):
order = db.orders.find_one({'_id': order_id})
if not order:
span.set_attribute('order.found', False)
return jsonify({'error': 'Not found'}), 404
span.set_attribute('order.found', True)
span.set_attribute('order.total', order['total'])
# Enrichir con datos de usuario
with tracer.start_as_current_span('enrich_order_data'):
enriched_order = enrich_order(order)
return jsonify(enriched_order)
def enrich_order(order):
"""Enrichir orden con datos adicionales"""
with tracer.start_as_current_span('fetch_user_data') as span:
span.set_attribute('user.id', order['user_id'])
user = fetch_user(order['user_id'])
with tracer.start_as_current_span('fetch_products'):
products = fetch_products([item['product_id'] for item in order['items']])
return {
**order,
'user': user,
'products': products
}
# Distributed tracing context propagation
import requests
def call_external_service(data):
"""Llamar servicio externo con trace context"""
with tracer.start_as_current_span('external_api_call') as span:
# El context se propaga automáticamente en headers
response = requests.post(
'https://external-service.com/api',
json=data
)
span.set_attribute('http.status_code', response.status_code)
return response.json()
Receta 4.10: Grafana Dashboards
Dashboard Configuration:
{
"dashboard": {
"title": "API Performance Dashboard",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "sum(rate(http_requests_total[5m])) by (status)"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"targets": [
{
"expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m]))"
}
],
"type": "stat",
"fieldConfig": {
"defaults": {
"unit": "percentunit",
"thresholds": {
"mode": "absolute",
"steps": [
{"value": 0, "color": "green"},
{"value": 0.01, "color": "yellow"},
{"value": 0.05, "color": "red"}
]
}
}
}
},
{
"title": "Latency (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint))"
}
],
"type": "graph"
},
{
"title": "Active Users",
"targets": [
{
"expr": "active_users"
}
],
"type": "stat"
}
]
}
}
Loki - Log Aggregation:
# promtail-config.yml (Log shipper)
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
# Logs de contenedores Docker
- job_name: docker
static_configs:
- targets:
- localhost
labels:
job: docker
__path__: /var/lib/docker/containers/*/*.log
pipeline_stages:
# Parsear JSON
- json:
expressions:
level: level
message: message
timestamp: timestamp
# Extraer labels
- labels:
level:
# Formatear timestamp
- timestamp:
source: timestamp
format: RFC3339
LogQL Queries (Loki Query Language):
# Todos los logs de API
{job="api"}
# Logs de error
{job="api"} |= "ERROR"
# Logs de usuario específico
{job="api"} | json | user_id="12345"
# Conteo de errores por endpoint
sum by (endpoint) (
count_over_time({job="api"} |= "ERROR" [5m])
)
# Latencia promedio
avg_over_time({job="api"} | json | unwrap duration [5m])
# Top 10 endpoints más lentos
topk(10,
avg_over_time({job="api"} | json | unwrap duration [5m]) by (endpoint)
)
Receta 4.11: SLIs, SLOs, SLAs y Error Budgets
Definiciones:
- SLI (Service Level Indicator): Métrica que mide un aspecto del servicio
- SLO (Service Level Objective): Target para un SLI
- SLA (Service Level Agreement): Contrato con consecuencias si no se cumple SLO
- Error Budget: Margen de error permitido (100% - SLO)
Ejemplo de SLIs:
from dataclasses import dataclass
from typing import List
from datetime import datetime, timedelta
@dataclass
class SLI:
"""Service Level Indicator"""
name: str
description: str
query: str # Prometheus query
good_threshold: float # Lo que se considera "bueno"
# Definir SLIs
SLIS = [
SLI(
name='availability',
description='Percentage of successful requests',
query='''
sum(rate(http_requests_total{status!~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
''',
good_threshold=0.99 # 99% de requests exitosos
),
SLI(
name='latency',
description='95th percentile latency',
query='''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)
''',
good_threshold=0.5 # P95 < 500ms
),
SLI(
name='error_rate',
description='Percentage of error responses',
query='''
sum(rate(http_requests_total{status=~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
''',
good_threshold=0.01 # < 1% errores
)
]
@dataclass
class SLO:
"""Service Level Objective"""
sli: SLI
target: float # Target (ej: 0.999 para 99.9%)
window: timedelta # Ventana de medición (ej: 30 días)
# Definir SLOs
SLOS = [
SLO(
sli=SLIS[0], # availability
target=0.999, # 99.9%
window=timedelta(days=30)
),
SLO(
sli=SLIS[1], # latency
target=0.95, # 95% de requests < 500ms
window=timedelta(days=30)
)
]
class ErrorBudget:
"""Cálculo de error budget"""
def __init__(self, slo: SLO):
self.slo = slo
def calculate_budget(self, total_requests: int) -> dict:
"""
Calcular error budget
Error Budget = (1 - SLO) × Total Requests
"""
error_budget = (1 - self.slo.target) * total_requests
return {
'slo_target': self.slo.target,
'total_requests': total_requests,
'allowed_errors': int(error_budget),
'allowed_downtime_minutes': self._calculate_downtime()
}
def _calculate_downtime(self) -> float:
"""Calcular downtime permitido"""
window_minutes = self.slo.window.total_seconds() / 60
downtime = window_minutes * (1 - self.slo.target)
return downtime
def is_budget_exhausted(self, actual_errors: int, total_requests: int) -> bool:
"""Verificar si se agotó el error budget"""
budget = self.calculate_budget(total_requests)
return actual_errors >= budget['allowed_errors']
# Uso
availability_slo = SLOS[0]
error_budget = ErrorBudget(availability_slo)
# En 30 días con 10M requests
budget = error_budget.calculate_budget(10_000_000)
print(f"Allowed errors: {budget['allowed_errors']}") # 10,000
print(f"Allowed downtime: {budget['allowed_downtime_minutes']:.2f} minutes") # 43.2 min
# Verificar si se agotó
actual_errors = 5000
is_exhausted = error_budget.is_budget_exhausted(actual_errors, 10_000_000)
print(f"Budget exhausted: {is_exhausted}") # False
Alerting basado en Error Budget:
# error-budget-alerts.yml
groups:
- name: error_budget
rules:
# Error budget 50% consumido
- alert: ErrorBudget50PercentConsumed
expr: |
(
1 - (
sum(rate(http_requests_total{status!~"5.."}[30d]))
/
sum(rate(http_requests_total[30d]))
)
) / (1 - 0.999) > 0.5
labels:
severity: warning
annotations:
summary: "50% of error budget consumed"
description: "We've used half our error budget for this month"
# Error budget 90% consumido
- alert: ErrorBudget90PercentConsumed
expr: |
(
1 - (
sum(rate(http_requests_total{status!~"5.."}[30d]))
/
sum(rate(http_requests_total[30d]))
)
) / (1 - 0.999) > 0.9
labels:
severity: critical
annotations:
summary: "90% of error budget consumed"
description: "URGENT: Error budget almost exhausted. Freeze deployments."
# Burn rate alto (consumiendo budget rápidamente)
- alert: HighErrorBudgetBurnRate
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[1h]))
/
sum(rate(http_requests_total[1h]))
) > (14.4 * (1 - 0.999))
for: 5m
labels:
severity: critical
annotations:
summary: "High error budget burn rate"
description: "At this rate, we'll exhaust error budget in 2 days"
Dashboard de SLOs:
# Generar reporte de SLOs
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta
class SLOReporter:
"""Generador de reportes de SLOs"""
def __init__(self, prometheus_url: str):
self.prom = PrometheusConnect(url=prometheus_url)
def generate_report(self, slo: SLO) -> dict:
"""Generar reporte de SLO"""
end_time = datetime.now()
start_time = end_time - slo.window
# Ejecutar query
result = self.prom.custom_query_range(
query=slo.sli.query,
start_time=start_time,
end_time=end_time,
step='1h'
)
# Calcular cumplimiento
values = [float(point[1]) for point in result[0]['values']]
avg_value = sum(values) / len(values)
# Comparar con target
is_meeting_slo = avg_value >= slo.target
# Calcular error budget
total_requests = self._get_total_requests(start_time, end_time)
error_budget = ErrorBudget(slo)
budget_info = error_budget.calculate_budget(total_requests)
return {
'slo_name': slo.sli.name,
'target': slo.target,
'actual': avg_value,
'is_meeting_slo': is_meeting_slo,
'margin': avg_value - slo.target,
'error_budget': budget_info,
'period': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
}
}
def _get_total_requests(self, start_time, end_time) -> int:
"""Obtener total de requests en período"""
query = 'sum(increase(http_requests_total[30d]))'
result = self.prom.custom_query(query)
return int(float(result[0]['value'][1]))
# Uso
reporter = SLOReporter('http://prometheus:9090')
for slo in SLOS:
report = reporter.generate_report(slo)
print(f"\n=== {report['slo_name'].upper()} ===")
print(f"Target: {report['target']*100}%")
print(f"Actual: {report['actual']*100:.2f}%")
print(f"Status: {'✅ MEETING' if report['is_meeting_slo'] else '❌ FAILING'}")
print(f"Margin: {report['margin']*100:+.2f}%")
print(f"Error Budget:")
print(f" Allowed errors: {report['error_budget']['allowed_errors']}")
print(f" Allowed downtime: {report['error_budget']['allowed_downtime_minutes']:.1f} min")
Receta 4.12: Incident Response
On-Call Runbook:
# Incident Response Runbook
## Severity Levels
### SEV-1 (Critical)
- Complete service outage
- Data loss/corruption
- Security breach
- Response: Immediate (24/7)
### SEV-2 (High)
- Partial service degradation
- Major feature broken
- Performance severely impacted
- Response: Within 1 hour
### SEV-3 (Medium)
- Minor feature broken
- Workaround available
- Response: Within 4 hours
### SEV-4 (Low)
- Cosmetic issues
- Documentation errors
- Response: Next business day
## Incident Response Process
1. **Detection**
- Alert fired
- User report
- Monitoring detected
2. **Triage**
- Classify severity
- Page on-call engineer
- Create incident channel
3. **Mitigation**
- Stop the bleeding
- Implement workaround
- Rollback if needed
4. **Investigation**
- Identify root cause
- Gather evidence
- Document timeline
5. **Resolution**
- Deploy fix
- Verify resolution
- Monitor for recurrence
6. **Post-Mortem**
- Write incident report
- Identify action items
- Update runbooks
## Common Issues
### High Error Rate
**Symptoms:**
- Error rate > 5%
- 5xx responses increasing
**Investigation:**
```bash
# Check error logs
kubectl logs -l app=api --tail=100 | grep ERROR
# Check recent deployments
kubectl rollout history deployment/api
# Check resource usage
kubectl top pods -l app=api
Mitigation:
# Rollback deployment
kubectl rollout undo deployment/api
# Scale up
kubectl scale deployment/api --replicas=10
# Check database
psql -c "SELECT * FROM pg_stat_activity WHERE state = 'active';"
High Latency
Symptoms:
- P95 latency > 1s
- Slow response times
Investigation:
# Check slow queries
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;
# Check cache hit rate
redis-cli info stats | grep keyspace
# Check external services
curl -w "@curl-format.txt" https://external-api.com/health
Mitigation:
- Add caching
- Optimize queries
- Add read replicas
- Enable CDN
Database Connection Pool Exhausted
Symptoms:
- “too many connections” errors
- Long connection wait times
Investigation:
-- Check active connections
SELECT count(*) FROM pg_stat_activity;
-- Check by state
SELECT state, count(*) FROM pg_stat_activity GROUP BY state;
-- Check long-running queries
SELECT pid, now() - query_start as duration, query
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY duration DESC;
Mitigation:
# Kill long-running queries
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND state = 'active'
AND now() - query_start > interval '5 minutes';
Escalation Path
- On-call engineer
- Team lead
- Engineering manager
- VP Engineering
- CTO
Communication Templates
Status Update
**Incident:** [Brief description]
**Severity:** SEV-X
**Status:** Investigating/Mitigating/Resolved
**Impact:** [Who/what is affected]
**ETA:** [Expected resolution time]
**Next Update:** [When]
Resolution Message
**Incident Resolved**
**Root Cause:** [What went wrong]
**Fix:** [What we did]
**Prevention:** [How we'll prevent recurrence]
**Duration:** [How long it lasted]
**Incident Management Tool:**
```python
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List, Optional
class Severity(Enum):
SEV1 = "critical"
SEV2 = "high"
SEV3 = "medium"
SEV4 = "low"
class IncidentStatus(Enum):
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
MONITORING = "monitoring"
RESOLVED = "resolved"
@dataclass
class Incident:
"""Incident tracking"""
id: str
title: str
severity: Severity
status: IncidentStatus
created_at: datetime
detected_by: str
assigned_to: Optional[str] = None
description: str = ""
timeline: List[dict] = None
root_cause: Optional[str] = None
resolution: Optional[str] = None
def __post_init__(self):
if self.timeline is None:
self.timeline = []
self.add_timeline_event(
f"Incident created by {self.detected_by}",
IncidentStatus.INVESTIGATING
)
def add_timeline_event(self, description: str, new_status: IncidentStatus = None):
"""Agregar evento al timeline"""
event = {
'timestamp': datetime.utcnow(),
'description': description
}
if new_status:
event['status_change'] = {
'from': self.status.value,
'to': new_status.value
}
self.status = new_status
self.timeline.append(event)
def assign(self, engineer: str):
"""Asignar a ingeniero"""
self.assigned_to = engineer
self.add_timeline_event(f"Assigned to {engineer}")
def update_status(self, new_status: IncidentStatus, note: str):
"""Actualizar status"""
self.add_timeline_event(note, new_status)
def resolve(self, root_cause: str, resolution: str):
"""Resolver incident"""
self.root_cause = root_cause
self.resolution = resolution
self.update_status(
IncidentStatus.RESOLVED,
f"Incident resolved. Root cause: {root_cause}"
)
def generate_report(self) -> str:
"""Generar reporte de incident"""
duration = datetime.utcnow() - self.created_at
report = f"""
# Incident Report: {self.title}
**Incident ID:** {self.id}
**Severity:** {self.severity.value}
**Status:** {self.status.value}
**Duration:** {duration}
**Detected By:** {self.detected_by}
**Assigned To:** {self.assigned_to}
## Description
{self.description}
## Timeline
"""
for event in self.timeline:
report += f"\n- **{event['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}**: {event['description']}"
if self.root_cause:
report += f"\n\n## Root Cause\n{self.root_cause}"
if self.resolution:
report += f"\n\n## Resolution\n{self.resolution}"
return report
# Uso
incident = Incident(
id="INC-2024-001",
title="High error rate in API",
severity=Severity.SEV1,
status=IncidentStatus.INVESTIGATING,
created_at=datetime.utcnow(),
detected_by="monitoring-system",
description="Error rate spiked to 15% at 14:23 UTC"
)
incident.assign("alice@example.com")
incident.add_timeline_event("Identified database connection pool exhaustion")
incident.update_status(IncidentStatus.IDENTIFIED, "Root cause identified")
incident.add_timeline_event("Increased pool size from 20 to 50")
incident.update_status(IncidentStatus.MONITORING, "Monitoring for stability")
incident.resolve(
root_cause="Database connection pool too small for traffic spike",
resolution="Increased pool size and added auto-scaling"
)
print(incident.generate_report())
¡Felicidades! 🎉
Has completado la FASE 4: Seguridad y Observabilidad del roadmap del desarrollador del futuro.
Lo que has aprendido:
✅ OWASP Top 10 - Vulnerabilidades críticas
✅ Injection attacks - SQL, NoSQL, Command
✅ XSS, CSRF y protecciones
✅ Autenticación y autorización seguras
✅ MFA y RBAC
✅ Encriptación y gestión de secretos
✅ Static analysis - SonarQube, CodeQL
✅ Dependency scanning
✅ Logging estructurado
✅ Métricas con Prometheus
✅ Distributed tracing con OpenTelemetry
✅ Grafana dashboards
✅ SLIs, SLOs, SLAs y Error Budgets
✅ Incident response y runbooks
Próximos pasos:
FASE 5: Ingeniería de Datos
- Pipelines de datos (ETL vs ELT)
- Apache Spark, Airflow
- Data warehousing vs data lakes
- Stream processing
FASE 6: IA y Colaboración con IA
- Fundamentos de ML/LLMs
- Ingeniería de prompts
- RAG y vector databases
- Fine-tuning vs embeddings
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo