🧑💻 Developer Cookbook - FASE 1: Fundamentos Reforzados
Recetas prácticas para dominar los fundamentos técnicos del desarrollo de software
📚 Tabla de Contenidos
- Estructuras de Datos y Algoritmos
- Patrones de Diseño
- Lectura y Refactorización de Código
- Sistemas Operativos y Redes
Estructuras de Datos y Algoritmos
Receta 1.1: Big O Notation - Midiendo el Rendimiento
¿Qué es? Notación matemática que describe cómo crece el tiempo de ejecución o el uso de memoria de un algoritmo en relación con el tamaño de la entrada.
Casos de uso:
- Comparar eficiencia entre algoritmos
- Predecir escalabilidad de tu código
- Optimizar cuellos de botella en producción
Complejidades comunes:
| Notación | Nombre | Ejemplo |
|---|---|---|
| O(1) | Constante | Acceso a array por índice |
| O(log n) | Logarítmica | Búsqueda binaria |
| O(n) | Lineal | Recorrer un array |
| O(n log n) | Lineal-logarítmica | Merge sort, Quick sort |
| O(n²) | Cuadrática | Bubble sort (nested loops) |
| O(2ⁿ) | Exponencial | Fibonacci recursivo ingenuo |
Ejemplo práctico:
# ❌ MAL: O(n²) - Buscar duplicados con nested loops
def has_duplicates_slow(arr):
for i in range(len(arr)):
for j in range(i + 1, len(arr)):
if arr[i] == arr[j]:
return True
return False
# ✅ BIEN: O(n) - Usar un set
def has_duplicates_fast(arr):
seen = set()
for num in arr:
if num in seen:
return True
seen.add(num)
return False
# Prueba con 10,000 elementos
import time
data = list(range(10000)) + [5000]
start = time.time()
has_duplicates_slow(data)
print(f"Slow: {time.time() - start:.4f}s") # ~0.5s
start = time.time()
has_duplicates_fast(data)
print(f"Fast: {time.time() - start:.4f}s") # ~0.001s
Cuándo importa:
- Datasets grandes (>1000 elementos)
- Operaciones frecuentes (cada request HTTP)
- Recursos limitados (móviles, IoT)
Receta 1.2: Arrays vs Listas Enlazadas
¿Qué son?
- Array: Bloques contiguos de memoria, acceso directo por índice
- Lista Enlazada: Nodos dispersos con punteros al siguiente
Comparación de operaciones:
| Operación | Array | Lista Enlazada |
|---|---|---|
| Acceso por índice | O(1) | O(n) |
| Inserción al inicio | O(n) | O(1) |
| Inserción al final | O(1)* | O(1) |
| Búsqueda | O(n) | O(n) |
| Eliminación | O(n) | O(1)** |
*Con capacidad dinámica puede ser O(n) en resize
**Si ya tienes el nodo
Caso de uso: Sistema de deshacer/rehacer
class Node:
def __init__(self, action):
self.action = action
self.prev = None
self.next = None
class UndoRedoManager:
"""Lista doblemente enlazada para deshacer/rehacer acciones"""
def __init__(self):
self.current = None
self.head = None
def do_action(self, action):
"""Ejecutar nueva acción"""
node = Node(action)
if self.current:
# Eliminar cualquier acción "redo" disponible
self.current.next = node
node.prev = self.current
else:
self.head = node
self.current = node
action.execute()
def undo(self):
"""Deshacer última acción - O(1)"""
if not self.current:
return "Nothing to undo"
self.current.action.rollback()
self.current = self.current.prev
return "Undone"
def redo(self):
"""Rehacer acción - O(1)"""
if not self.current or not self.current.next:
return "Nothing to redo"
self.current = self.current.next
self.current.action.execute()
return "Redone"
# Ejemplo de uso
class TextAction:
def __init__(self, text):
self.text = text
self.document = []
def execute(self):
self.document.append(self.text)
print(f"Added: {self.text}")
def rollback(self):
self.document.pop()
print(f"Removed: {self.text}")
manager = UndoRedoManager()
manager.do_action(TextAction("Hello"))
manager.do_action(TextAction("World"))
manager.undo() # Removed: World
manager.redo() # Added: World
Cuándo usar cada uno:
- Array: Necesitas acceso rápido por índice, tamaño relativamente estable
- Lista Enlazada: Muchas inserciones/eliminaciones, no necesitas acceso aleatorio
Receta 1.3: Stacks y Queues - LIFO vs FIFO
¿Qué son?
- Stack (Pila): Last In, First Out - como una pila de platos
- Queue (Cola): First In, First Out - como una fila del banco
Caso de uso real: Sistema de procesamiento de tareas
from collections import deque
from datetime import datetime
class TaskQueue:
"""Queue para procesamiento FIFO de tareas"""
def __init__(self):
self.queue = deque()
def enqueue(self, task):
"""Agregar tarea al final"""
self.queue.append(task)
print(f"📥 Enqueued: {task['name']}")
def dequeue(self):
"""Procesar siguiente tarea (la más antigua)"""
if not self.queue:
return None
task = self.queue.popleft()
print(f"⚙️ Processing: {task['name']}")
return task
class CallStack:
"""Stack para rastrear ejecución de funciones"""
def __init__(self):
self.stack = []
def push(self, function_name):
"""Entrar a una función"""
self.stack.append({
'name': function_name,
'timestamp': datetime.now()
})
print(f"→ Entering: {function_name}")
def pop(self):
"""Salir de una función"""
if not self.stack:
return None
func = self.stack.pop()
duration = (datetime.now() - func['timestamp']).total_seconds()
print(f"← Exiting: {func['name']} ({duration:.3f}s)")
return func
def peek(self):
"""Ver función actual sin removerla"""
return self.stack[-1] if self.stack else None
# Ejemplo 1: Task Queue (FIFO)
print("=== QUEUE Example ===")
task_queue = TaskQueue()
task_queue.enqueue({'name': 'Send email', 'priority': 1})
task_queue.enqueue({'name': 'Generate report', 'priority': 2})
task_queue.enqueue({'name': 'Backup database', 'priority': 3})
task_queue.dequeue() # Procesa "Send email" primero
task_queue.dequeue() # Luego "Generate report"
# Ejemplo 2: Call Stack (LIFO)
print("\n=== STACK Example ===")
call_stack = CallStack()
def function_a():
call_stack.push('function_a')
function_b()
call_stack.pop()
def function_b():
call_stack.push('function_b')
function_c()
call_stack.pop()
def function_c():
call_stack.push('function_c')
# Hacer algo
call_stack.pop()
function_a()
# Output muestra el orden LIFO de ejecución
Aplicaciones reales:
-
Stack:
- Navegación del navegador (back/forward)
- Deshacer/Rehacer en editores
- Evaluación de expresiones (calculadoras)
- Call stack de debuggers
-
Queue:
- Message queues (RabbitMQ, SQS)
- Print spoolers
- Breadth-First Search
- Rate limiting buffers
Receta 1.4: Árboles Binarios de Búsqueda (BST)
¿Qué es? Estructura jerárquica donde cada nodo tiene máximo 2 hijos, y para cada nodo: todos los valores a la izquierda son menores, todos a la derecha son mayores.
Ventajas:
- Búsqueda O(log n) en promedio
- Inserción/eliminación O(log n)
- Datos ordenados automáticamente
Caso de uso: Sistema de autocompletado
class TreeNode:
def __init__(self, word, frequency=1):
self.word = word
self.frequency = frequency
self.left = None
self.right = None
class AutocompleteSystem:
"""BST para sugerencias de búsqueda"""
def __init__(self):
self.root = None
def insert(self, word):
"""Agregar palabra al diccionario"""
if not self.root:
self.root = TreeNode(word)
return
current = self.root
while True:
if word < current.word:
if current.left is None:
current.left = TreeNode(word)
break
current = current.left
elif word > current.word:
if current.right is None:
current.right = TreeNode(word)
break
current = current.right
else:
# Palabra ya existe, incrementar frecuencia
current.frequency += 1
break
def search_prefix(self, prefix):
"""Encontrar todas las palabras con el prefijo"""
results = []
def traverse(node):
if not node:
return
if node.word.startswith(prefix):
results.append((node.word, node.frequency))
# Solo explorar ramas relevantes
if prefix <= node.word:
traverse(node.left)
if prefix >= node.word[:len(prefix)]:
traverse(node.right)
traverse(self.root)
# Ordenar por frecuencia (más populares primero)
return sorted(results, key=lambda x: x[1], reverse=True)
# Ejemplo de uso
autocomplete = AutocompleteSystem()
# Insertar palabras con sus búsquedas
searches = ['python', 'javascript', 'java', 'python', 'php',
'python', 'javascript', 'java', 'ruby', 'python']
for search in searches:
autocomplete.insert(search)
# Autocompletar "py"
suggestions = autocomplete.search_prefix('py')
print("Suggestions for 'py':", suggestions)
# Output: [('python', 4), ('php', 1)]
# Autocompletar "ja"
suggestions = autocomplete.search_prefix('ja')
print("Suggestions for 'ja':", suggestions)
# Output: [('java', 2), ('javascript', 2)]
Cuándo NO usar BST:
- Datos que llegan ordenados (degenera a lista enlazada)
- Necesitas garantías de O(log n) siempre (usa AVL o Red-Black Tree)
- Acceso frecuente por índice (usa array)
Receta 1.5: Hash Tables - El poder del O(1)
¿Qué es? Estructura que mapea keys a values usando una función hash, permitiendo búsqueda, inserción y eliminación en tiempo constante promedio.
Caso de uso: Cache de resultados costosos
import time
import hashlib
class CacheWithExpiry:
"""Hash table con expiración de entradas"""
def __init__(self, ttl_seconds=60):
self.cache = {}
self.ttl = ttl_seconds
self.hits = 0
self.misses = 0
def _hash_key(self, key):
"""Generar hash consistente para cualquier tipo de key"""
return hashlib.md5(str(key).encode()).hexdigest()
def get(self, key):
"""Obtener valor del cache"""
hashed_key = self._hash_key(key)
if hashed_key in self.cache:
entry = self.cache[hashed_key]
# Verificar si expiró
if time.time() - entry['timestamp'] < self.ttl:
self.hits += 1
print(f"✓ Cache HIT for {key}")
return entry['value']
else:
# Expiró, eliminar
del self.cache[hashed_key]
self.misses += 1
print(f"✗ Cache MISS for {key}")
return None
def set(self, key, value):
"""Guardar en cache"""
hashed_key = self._hash_key(key)
self.cache[hashed_key] = {
'value': value,
'timestamp': time.time()
}
print(f"💾 Cached: {key}")
def stats(self):
"""Estadísticas del cache"""
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return f"Hits: {self.hits}, Misses: {self.misses}, Rate: {hit_rate:.1f}%"
# Simulación de función costosa
def expensive_computation(n):
"""Simula cálculo pesado (ej: consulta a DB)"""
print(f"⏳ Computing fibonacci({n})...")
time.sleep(0.5) # Simular delay
if n <= 1:
return n
a, b = 0, 1
for _ in range(n - 1):
a, b = b, a + b
return b
# Usar cache
cache = CacheWithExpiry(ttl_seconds=5)
# Primera llamada - MISS
result = cache.get('fib_10')
if result is None:
result = expensive_computation(10)
cache.set('fib_10', result)
# Segunda llamada - HIT
result = cache.get('fib_10')
# Esperar expiración
print("\n⏰ Waiting for cache expiry...")
time.sleep(6)
# Tercera llamada - MISS (expiró)
result = cache.get('fib_10')
if result is None:
result = expensive_computation(10)
cache.set('fib_10', result)
print(f"\n📊 {cache.stats()}")
Colisiones y resolución:
class SimpleHashTable:
"""Hash table con chaining para manejar colisiones"""
def __init__(self, size=10):
self.size = size
self.table = [[] for _ in range(size)]
def _hash(self, key):
"""Simple hash function"""
return sum(ord(c) for c in str(key)) % self.size
def insert(self, key, value):
"""Insertar con manejo de colisiones"""
index = self._hash(key)
bucket = self.table[index]
# Actualizar si existe
for i, (k, v) in enumerate(bucket):
if k == key:
bucket[i] = (key, value)
return
# Agregar nuevo
bucket.append((key, value))
print(f"Inserted '{key}' at index {index} (bucket size: {len(bucket)})")
def get(self, key):
"""Buscar valor"""
index = self._hash(key)
bucket = self.table[index]
for k, v in bucket:
if k == key:
return v
return None
# Demostración de colisiones
ht = SimpleHashTable(size=5)
ht.insert("cat", "🐱") # hash = 3
ht.insert("act", "🎭") # hash = 3 (colisión!)
ht.insert("dog", "🐶") # hash = 2
print(ht.get("cat")) # 🐱
print(ht.get("act")) # 🎭
Aplicaciones reales:
- Databases (índices)
- Caches (Redis, Memcached)
- Deduplicación de datos
- Contadores y frecuencias
- Mapeo de URLs a rutas
Receta 1.6: Búsqueda Binaria - Dividir y Conquistar
¿Qué es? Algoritmo de búsqueda en arrays ordenados que divide el espacio de búsqueda a la mitad en cada paso (O(log n)).
Caso de uso: Búsqueda en logs ordenados por timestamp
from datetime import datetime, timedelta
import bisect
class LogSearcher:
"""Búsqueda eficiente en logs ordenados por tiempo"""
def __init__(self):
self.logs = []
self.timestamps = []
def add_log(self, timestamp, message):
"""Agregar log manteniendo orden"""
# bisect.insort es búsqueda binaria + inserción
bisect.insort(self.timestamps, timestamp)
index = bisect.bisect_left(self.timestamps, timestamp)
self.logs.insert(index, {'time': timestamp, 'msg': message})
def find_logs_in_range(self, start_time, end_time):
"""Encontrar logs en rango de tiempo - O(log n)"""
# Encontrar índices con búsqueda binaria
start_idx = bisect.bisect_left(self.timestamps, start_time)
end_idx = bisect.bisect_right(self.timestamps, end_time)
return self.logs[start_idx:end_idx]
def find_first_error_after(self, timestamp):
"""Encontrar primer error después de cierto tiempo"""
idx = bisect.bisect_left(self.timestamps, timestamp)
for i in range(idx, len(self.logs)):
if 'ERROR' in self.logs[i]['msg']:
return self.logs[i]
return None
# Ejemplo de uso
searcher = LogSearcher()
base_time = datetime(2024, 1, 1, 10, 0, 0)
# Agregar logs
logs_data = [
(0, "INFO: Server started"),
(5, "INFO: Connection established"),
(10, "WARNING: High memory usage"),
(15, "ERROR: Connection timeout"),
(20, "INFO: Retry successful"),
(30, "ERROR: Database error"),
]
for offset, msg in logs_data:
searcher.add_log(base_time + timedelta(seconds=offset), msg)
# Buscar logs en ventana de tiempo
start = base_time + timedelta(seconds=10)
end = base_time + timedelta(seconds=20)
results = searcher.find_logs_in_range(start, end)
print("Logs between 10s and 20s:")
for log in results:
print(f" {log['time'].strftime('%H:%M:%S')} - {log['msg']}")
# Encontrar primer error después de los 12 segundos
error = searcher.find_first_error_after(base_time + timedelta(seconds=12))
print(f"\nFirst error after 12s: {error['msg']}")
Implementación clásica:
def binary_search(arr, target):
"""Búsqueda binaria iterativa"""
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1 # No encontrado
def binary_search_recursive(arr, target, left=0, right=None):
"""Búsqueda binaria recursiva"""
if right is None:
right = len(arr) - 1
if left > right:
return -1
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
return binary_search_recursive(arr, target, mid + 1, right)
else:
return binary_search_recursive(arr, target, left, mid - 1)
# Prueba
sorted_array = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
print(binary_search(sorted_array, 7)) # 3
print(binary_search(sorted_array, 8)) # -1
Variaciones útiles:
- bisect_left: Encuentra posición donde insertar manteniendo orden
- bisect_right: Encuentra posición después del último elemento igual
- Búsqueda en respuesta: Encontrar valor óptimo en rango (ej: capacidad mínima de servidor)
Receta 1.7: Algoritmos de Ordenamiento
Comparación rápida:
| Algoritmo | Tiempo Promedio | Peor Caso | Espacio | Estable? |
|---|---|---|---|---|
| Bubble Sort | O(n²) | O(n²) | O(1) | ✅ Sí |
| Selection Sort | O(n²) | O(n²) | O(1) | ❌ No |
| Insertion Sort | O(n²) | O(n²) | O(1) | ✅ Sí |
| Merge Sort | O(n log n) | O(n log n) | O(n) | ✅ Sí |
| Quick Sort | O(n log n) | O(n²) | O(log n) | ❌ No |
| Heap Sort | O(n log n) | O(n log n) | O(1) | ❌ No |
Caso de uso: Ordenamiento personalizado de productos
from dataclasses import dataclass
from typing import List
import random
@dataclass
class Product:
name: str
price: float
rating: float
sales: int
def __repr__(self):
return f"{self.name} (${self.price}, ⭐{self.rating}, 🛒{self.sales})"
def merge_sort_custom(arr: List[Product], key_func) -> List[Product]:
"""Merge Sort con función de comparación personalizada"""
if len(arr) <= 1:
return arr
mid = len(arr) // 2
left = merge_sort_custom(arr[:mid], key_func)
right = merge_sort_custom(arr[mid:], key_func)
return merge(left, right, key_func)
def merge(left, right, key_func):
"""Combinar dos arrays ordenados"""
result = []
i = j = 0
while i < len(left) and j < len(right):
if key_func(left[i]) <= key_func(right[j]):
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
def quick_sort_custom(arr: List[Product], key_func) -> List[Product]:
"""Quick Sort con función personalizada"""
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
pivot_value = key_func(pivot)
left = [x for x in arr if key_func(x) < pivot_value]
middle = [x for x in arr if key_func(x) == pivot_value]
right = [x for x in arr if key_func(x) > pivot_value]
return quick_sort_custom(left, key_func) + middle + quick_sort_custom(right, key_func)
# Crear productos de ejemplo
products = [
Product("Laptop Pro", 1299.99, 4.8, 523),
Product("Mouse Wireless", 29.99, 4.2, 1250),
Product("Keyboard RGB", 79.99, 4.5, 892),
Product("Monitor 4K", 449.99, 4.7, 234),
Product("Webcam HD", 89.99, 3.9, 678),
]
# Ordenar por diferentes criterios
print("=== Ordenar por PRECIO (ascendente) ===")
sorted_price = merge_sort_custom(products, lambda p: p.price)
for p in sorted_price:
print(p)
print("\n=== Ordenar por RATING (descendente) ===")
sorted_rating = quick_sort_custom(products, lambda p: -p.rating)
for p in sorted_rating:
print(p)
print("\n=== Ordenar por POPULARIDAD (descendente) ===")
sorted_sales = merge_sort_custom(products, lambda p: -p.sales)
for p in sorted_sales:
print(p)
print("\n=== Ordenar por SCORE COMPUESTO ===")
# Score: 40% rating, 30% sales, 30% precio inverso
def popularity_score(p):
max_sales = max(prod.sales for prod in products)
max_price = max(prod.price for prod in products)
return (
p.rating * 0.4 +
(p.sales / max_sales) * 0.3 +
(1 - p.price / max_price) * 0.3
)
sorted_score = merge_sort_custom(products, lambda p: -popularity_score(p))
for p in sorted_score:
print(f"{p} - Score: {popularity_score(p):.3f}")
Cuándo usar cada uno:
- Bubble/Insertion Sort: Arrays casi ordenados o muy pequeños (<10 elementos)
- Merge Sort: Necesitas estabilidad y O(n log n) garantizado (ej: ordenar registros por múltiples campos)
- Quick Sort: Mejor rendimiento promedio en memoria (in-place), no necesitas estabilidad
- Tim Sort (Python’s default): Híbrido optimizado para datos reales
Receta 1.8: Programación Dinámica - Fibonacci Optimizado
¿Qué es? Técnica para resolver problemas complejos dividiéndolos en subproblemas más simples y almacenando resultados para evitar recálculos.
Problema clásico: Fibonacci
import time
from functools import lru_cache
# ❌ ENFOQUE INGENUO - O(2^n)
def fibonacci_naive(n):
"""Recursivo sin optimización - EXTREMADAMENTE LENTO"""
if n <= 1:
return n
return fibonacci_naive(n - 1) + fibonacci_naive(n - 2)
# ✅ TOP-DOWN (Memoization) - O(n)
@lru_cache(maxsize=None)
def fibonacci_memo(n):
"""Recursivo con caché automático"""
if n <= 1:
return n
return fibonacci_memo(n - 1) + fibonacci_memo(n - 2)
# ✅ BOTTOM-UP (Tabulation) - O(n), más eficiente en espacio
def fibonacci_dp(n):
"""Iterativo construyendo desde abajo"""
if n <= 1:
return n
dp = [0] * (n + 1)
dp[1] = 1
for i in range(2, n + 1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
# ✅ OPTIMIZACIÓN ESPACIAL - O(1) espacio
def fibonacci_optimized(n):
"""Solo mantenemos los últimos 2 valores"""
if n <= 1:
return n
prev, curr = 0, 1
for _ in range(2, n + 1):
prev, curr = curr, prev + curr
return curr
# Comparación de rendimiento
def benchmark(func, n, name):
start = time.time()
result = func(n)
duration = time.time() - start
print(f"{name:20} n={n}: {result:,} ({duration:.6f}s)")
print("=== Comparación de Performance ===")
for n in [10, 20, 30]:
print(f"\nCalculando Fibonacci({n}):")
if n <= 30: # Naive es muy lento para n>30
benchmark(fibonacci_naive, n, "Naive (recursivo)")
benchmark(fibonacci_memo, n, "Memoization")
benchmark(fibonacci_dp, n, "DP Tabulation")
benchmark(fibonacci_optimized, n, "Optimizado")
fibonacci_memo.cache_clear() # Limpiar cache entre pruebas
Caso de uso real: Problema de la mochila (Knapsack)
def knapsack_dp(weights, values, capacity):
"""
Problema de la mochila: maximizar valor sin exceder capacidad.
Usado en: optimización de recursos, presupuestos, scheduling
"""
n = len(weights)
# dp[i][w] = valor máximo con primeros i items y capacidad w
dp = [[0] * (capacity + 1) for _ in range(n + 1)]
for i in range(1, n + 1):
for w in range(capacity + 1):
# Opción 1: No incluir item i
dp[i][w] = dp[i-1][w]
# Opción 2: Incluir item i (si cabe)
if weights[i-1] <= w:
dp[i][w] = max(
dp[i][w],
dp[i-1][w - weights[i-1]] + values[i-1]
)
return dp[n][capacity]
# Ejemplo: Optimizar carga de un servidor
tasks = {
"DB Backup": {"cpu": 2, "value": 10}, # 2 CPUs, prioridad 10
"Analytics": {"cpu": 3, "value": 15}, # 3 CPUs, prioridad 15
"API Server": {"cpu": 1, "value": 20}, # 1 CPU, prioridad 20
"Cache Warming": {"cpu": 2, "value": 8}, # 2 CPUs, prioridad 8
"Log Processing": {"cpu": 1, "value": 5}, # 1 CPU, prioridad 5
}
weights = [task["cpu"] for task in tasks.values()]
values = [task["value"] for task in tasks.values()]
max_cpu = 5 # Capacidad del servidor
max_value = knapsack_dp(weights, values, max_cpu)
print(f"Valor máximo optimizado: {max_value}")
print(f"Con capacidad de {max_cpu} CPUs")
Otros problemas comunes de DP:
- Longest Common Subsequence (diff de archivos)
- Edit Distance (corrección ortográfica)
- Coin Change (hacer cambio con mínimas monedas)
- Path Finding (rutas óptimas en grids)
Patrones de Diseño
Receta 2.1: Singleton - Una única instancia
¿Qué es? Patrón que garantiza que una clase tenga solo una instancia y proporciona un punto de acceso global a ella.
Caso de uso: Configuración global de aplicación
import threading
from typing import Any, Dict
class ConfigurationManager:
"""Singleton thread-safe para gestión de configuración"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
# Double-check locking
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
# Inicializar solo una vez
if not self._initialized:
self._config: Dict[str, Any] = {
'database': {
'host': 'localhost',
'port': 5432,
'pool_size': 10
},
'cache': {
'ttl': 300,
'max_size': 1000
},
'api': {
'rate_limit': 100,
'timeout': 30
}
}
self._initialized = True
def get(self, key: str, default=None):
"""Obtener valor de configuración con notación dot"""
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
if value is None:
return default
return value
def set(self, key: str, value: Any):
"""Establecer valor de configuración"""
keys = key.split('.')
config = self._config
for k in keys[:-1]:
config = config.setdefault(k, {})
config[keys[-1]] = value
# Demostración
config1 = ConfigurationManager()
config2 = ConfigurationManager()
print(f"Son la misma instancia: {config1 is config2}") # True
print(f"ID config1: {id(config1)}")
print(f"ID config2: {id(config2)}")
# Configurar desde cualquier parte
config1.set('database.host', 'production.db.com')
config1.set('api.rate_limit', 500)
# Acceder desde cualquier parte
print(f"\nDatabase host: {config2.get('database.host')}")
print(f"API rate limit: {config2.get('api.rate_limit')}")
print(f"Cache TTL: {config2.get('cache.ttl')}")
# Simular uso en diferentes módulos
def module_a():
config = ConfigurationManager()
return config.get('database.pool_size')
def module_b():
config = ConfigurationManager()
config.set('database.pool_size', 20)
module_b()
print(f"\nPool size desde module_a: {module_a()}") # 20
Alternativa moderna: Módulo como Singleton
# config.py
class _Config:
def __init__(self):
self.database_url = "postgresql://localhost/mydb"
self.debug = True
self.secret_key = "dev-secret-123"
# Crear instancia única a nivel de módulo
config = _Config()
# En otros archivos, simplemente:
# from config import config
# print(config.database_url)
Cuándo usar:
- Logging systems
- Connection pools
- Caches globales
- Configuración de aplicación
Cuándo NO usar:
- Dificulta testing (estado global)
- Puede ocultar dependencias
- Problemas con multi-threading si no se implementa bien
Receta 2.2: Factory - Creación de objetos flexible
¿Qué es? Patrón que delega la creación de objetos a subclases o métodos especializados, permitiendo crear objetos sin especificar la clase exacta.
Caso de uso: Parsers de diferentes formatos
from abc import ABC, abstractmethod
import json
import yaml
import csv
from io import StringIO
class DataParser(ABC):
"""Interfaz base para parsers"""
@abstractmethod
def parse(self, data: str) -> dict:
"""Parsear datos al formato estándar"""
pass
@abstractmethod
def serialize(self, data: dict) -> str:
"""Serializar datos del formato estándar"""
pass
class JSONParser(DataParser):
def parse(self, data: str) -> dict:
return json.loads(data)
def serialize(self, data: dict) -> str:
return json.dumps(data, indent=2)
class YAMLParser(DataParser):
def parse(self, data: str) -> dict:
return yaml.safe_load(data)
def serialize(self, data: dict) -> str:
return yaml.dump(data, default_flow_style=False)
class CSVParser(DataParser):
def parse(self, data: str) -> dict:
"""Convertir CSV a dict con primera fila como keys"""
reader = csv.DictReader(StringIO(data))
return {"rows": list(reader)}
def serialize(self, data: dict) -> str:
"""Convertir dict a CSV"""
if not data.get("rows"):
return ""
output = StringIO()
writer = csv.DictWriter(output, fieldnames=data["rows"][0].keys())
writer.writeheader()
writer.writerows(data["rows"])
return output.getvalue()
class ParserFactory:
"""Factory para crear parsers según el tipo"""
_parsers = {
'json': JSONParser,
'yaml': YAMLParser,
'yml': YAMLParser,
'csv': CSVParser,
}
@classmethod
def create_parser(cls, format_type: str) -> DataParser:
"""Crear parser apropiado"""
parser_class = cls._parsers.get(format_type.lower())
if not parser_class:
raise ValueError(f"Formato no soportado: {format_type}")
return parser_class()
@classmethod
def register_parser(cls, format_type: str, parser_class):
"""Registrar nuevo tipo de parser (extensibilidad)"""
cls._parsers[format_type] = parser_class
# Uso del factory
def process_data_file(filename: str, content: str):
"""Procesar archivo sin saber su formato de antemano"""
# Detectar formato por extensión
extension = filename.split('.')[-1]
# Factory crea el parser apropiado
parser = ParserFactory.create_parser(extension)
# Usar el parser de forma polimórfica
data = parser.parse(content)
print(f"\n=== Procesando {filename} ===")
print(f"Tipo de parser: {parser.__class__.__name__}")
print(f"Datos parseados: {data}")
# Serializar de vuelta
serialized = parser.serialize(data)
print(f"Serializado:\n{serialized[:100]}...")
# Ejemplos
json_content = '{"name": "John", "age": 30, "city": "NYC"}'
process_data_file("user.json", json_content)
yaml_content = """
name: Jane
age: 25
city: SF
"""
process_data_file("user.yaml", yaml_content)
csv_content = """name,age,city
Bob,35,LA
Alice,28,Chicago"""
process_data_file("users.csv", csv_content)
# Extensibilidad: Agregar nuevo formato
class XMLParser(DataParser):
def parse(self, data: str) -> dict:
# Implementación simplificada
return {"xml": "parsed data"}
def serialize(self, data: dict) -> str:
return "<data>serialized</data>"
ParserFactory.register_parser('xml', XMLParser)
process_data_file("data.xml", "<root><item>test</item></root>")
Factory Method vs Abstract Factory:
# Factory Method: Una familia de productos
class NotificationFactory:
@staticmethod
def create(channel: str):
if channel == 'email':
return EmailNotification()
elif channel == 'sms':
return SMSNotification()
elif channel == 'push':
return PushNotification()
# Abstract Factory: Familias completas de productos relacionados
class CloudProviderFactory(ABC):
@abstractmethod
def create_compute(self):
pass
@abstractmethod
def create_storage(self):
pass
class AWSFactory(CloudProviderFactory):
def create_compute(self):
return EC2Instance()
def create_storage(self):
return S3Bucket()
class AzureFactory(CloudProviderFactory):
def create_compute(self):
return VirtualMachine()
def create_storage(self):
return BlobStorage()
Receta 2.3: Observer - Notificación de cambios
¿Qué es? Patrón que define una dependencia uno-a-muchos entre objetos, de modo que cuando un objeto cambia de estado, todos sus dependientes son notificados automáticamente.
Caso de uso: Sistema de eventos en una aplicación
from abc import ABC, abstractmethod
from typing import List, Any
from datetime import datetime
class Observer(ABC):
"""Interfaz para observadores"""
@abstractmethod
def update(self, event: str, data: Any):
"""Recibir notificación de cambio"""
pass
class Subject:
"""Sujeto observable"""
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer):
"""Registrar observador"""
if observer not in self._observers:
self._observers.append(observer)
print(f"✓ {observer.__class__.__name__} attached")
def detach(self, observer: Observer):
"""Eliminar observador"""
self._observers.remove(observer)
print(f"✗ {observer.__class__.__name__} detached")
def notify(self, event: str, data: Any = None):
"""Notificar a todos los observadores"""
print(f"\n📢 Notificando evento: {event}")
for observer in self._observers:
observer.update(event, data)
class UserAccount(Subject):
"""Cuenta de usuario observable"""
def __init__(self, username: str):
super().__init__()
self.username = username
self.balance = 0
def deposit(self, amount: float):
"""Depositar dinero"""
self.balance += amount
self.notify('deposit', {
'username': self.username,
'amount': amount,
'new_balance': self.balance,
'timestamp': datetime.now()
})
def withdraw(self, amount: float):
"""Retirar dinero"""
if amount > self.balance:
self.notify('insufficient_funds', {
'username': self.username,
'attempted': amount,
'available': self.balance
})
return False
self.balance -= amount
self.notify('withdrawal', {
'username': self.username,
'amount': amount,
'new_balance': self.balance,
'timestamp': datetime.now()
})
return True
# Observadores concretos
class EmailNotifier(Observer):
"""Envía emails en eventos importantes"""
def update(self, event: str, data: Any):
if event == 'deposit' and data['amount'] >= 1000:
print(f"📧 EMAIL: Large deposit of ${data['amount']:.2f} detected for {data['username']}")
elif event == 'insufficient_funds':
print(f"📧 EMAIL: Insufficient funds alert for {data['username']}")
class SMSNotifier(Observer):
"""Envía SMS para retiros"""
def update(self, event: str, data: Any):
if event == 'withdrawal':
print(f"📱 SMS: ${data['amount']:.2f} withdrawn from your account. New balance: ${data['new_balance']:.2f}")
class AuditLogger(Observer):
"""Registra todas las transacciones"""
def __init__(self):
self.logs = []
def update(self, event: str, data: Any):
log_entry = {
'event': event,
'data': data,
'logged_at': datetime.now()
}
self.logs.append(log_entry)
print(f"📝 AUDIT: Logged {event} event")
class FraudDetector(Observer):
"""Detecta actividad sospechosa"""
def __init__(self):
self.transaction_count = {}
def update(self, event: str, data: Any):
if event in ['deposit', 'withdrawal']:
username = data['username']
self.transaction_count[username] = self.transaction_count.get(username, 0) + 1
if self.transaction_count[username] >= 3:
print(f"🚨 FRAUD ALERT: Multiple transactions ({self.transaction_count[username]}) from {username}")
# Demostración
print("=== Sistema de Observadores ===")
account = UserAccount("john_doe")
# Registrar observadores
email_notifier = EmailNotifier()
sms_notifier = SMSNotifier()
audit_logger = AuditLogger()
fraud_detector = FraudDetector()
account.attach(email_notifier)
account.attach(sms_notifier)
account.attach(audit_logger)
account.attach(fraud_detector)
# Realizar transacciones
account.deposit(500)
account.deposit(1500) # Triggerea email por cantidad grande
account.withdraw(300)
account.withdraw(100)
account.withdraw(50) # Triggerea fraud alert
account.withdraw(5000) # Insufficient funds
# Desregistrar un observador
print("\n--- Removiendo SMS Notifier ---")
account.detach(sms_notifier)
account.deposit(200) # Solo los otros observadores se notifican
# Ver logs de auditoría
print(f"\n📊 Total de logs de auditoría: {len(audit_logger.logs)}")
Aplicaciones reales:
- Event buses (Redux, Vue.js reactivity)
- WebSockets/Real-time updates
- Model-View-Controller (MVC)
- Pub/Sub messaging systems
- Reactive programming (RxJS, React hooks)
Receta 2.4: Strategy - Algoritmos intercambiables
¿Qué es? Patrón que define una familia de algoritmos, encapsula cada uno y los hace intercambiables, permitiendo que el algoritmo varíe independientemente de los clientes que lo usan.
Caso de uso: Sistema de precios con diferentes estrategias
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, date
from typing import List
@dataclass
class Product:
name: str
base_price: float
category: str
@dataclass
class Customer:
name: str
loyalty_level: str # 'bronze', 'silver', 'gold', 'platinum'
member_since: date
class PricingStrategy(ABC):
"""Estrategia base de pricing"""
@abstractmethod
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
"""Calcular precio final"""
pass
@abstractmethod
def get_description(self) -> str:
"""Descripción de la estrategia"""
pass
class RegularPricing(PricingStrategy):
"""Sin descuentos"""
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
return product.base_price * quantity
def get_description(self) -> str:
return "Precio regular sin descuentos"
class LoyaltyPricing(PricingStrategy):
"""Descuentos basados en nivel de lealtad"""
DISCOUNTS = {
'bronze': 0.05, # 5%
'silver': 0.10, # 10%
'gold': 0.15, # 15%
'platinum': 0.20 # 20%
}
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
base_total = product.base_price * quantity
discount = self.DISCOUNTS.get(customer.loyalty_level, 0)
return base_total * (1 - discount)
def get_description(self) -> str:
return "Descuento por nivel de lealtad del cliente"
class BulkPricing(PricingStrategy):
"""Descuentos por volumen"""
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
base_total = product.base_price * quantity
if quantity >= 100:
discount = 0.20 # 20% para 100+
elif quantity >= 50:
discount = 0.15 # 15% para 50+
elif quantity >= 20:
discount = 0.10 # 10% para 20+
else:
discount = 0
return base_total * (1 - discount)
def get_description(self) -> str:
return "Descuento por cantidad (20+ items: 10%, 50+: 15%, 100+: 20%)"
class SeasonalPricing(PricingStrategy):
"""Descuentos por temporada"""
def __init__(self):
self.current_month = datetime.now().month
# Temporada alta: junio-agosto, temporada baja: diciembre-febrero
self.high_season = [6, 7, 8]
self.low_season = [12, 1, 2]
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
base_total = product.base_price * quantity
if self.current_month in self.high_season:
# Precio premium en temporada alta
return base_total * 1.20
elif self.current_month in self.low_season:
# Descuento en temporada baja
return base_total * 0.80
else:
return base_total
def get_description(self) -> str:
return "Ajuste de precios por temporada (+20% alta, -20% baja)"
class ComboStrategy(PricingStrategy):
"""Combina múltiples estrategias tomando el mejor precio"""
def __init__(self, strategies: List[PricingStrategy]):
self.strategies = strategies
def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
# Aplicar todas las estrategias y tomar el precio más bajo
prices = [
strategy.calculate_price(product, customer, quantity)
for strategy in self.strategies
]
return min(prices)
def get_description(self) -> str:
strategy_names = [s.__class__.__name__ for s in self.strategies]
return f"Mejor precio entre: {', '.join(strategy_names)}"
class ShoppingCart:
"""Carrito con estrategia de pricing intercambiable"""
def __init__(self, customer: Customer, pricing_strategy: PricingStrategy):
self.customer = customer
self.pricing_strategy = pricing_strategy
self.items: List[tuple] = [] # (product, quantity)
def add_item(self, product: Product, quantity: int):
"""Agregar producto al carrito"""
self.items.append((product, quantity))
def set_pricing_strategy(self, strategy: PricingStrategy):
"""Cambiar estrategia de pricing"""
self.pricing_strategy = strategy
print(f"\n🔄 Estrategia cambiada a: {strategy.get_description()}")
def calculate_total(self) -> float:
"""Calcular total con la estrategia actual"""
total = 0
for product, quantity in self.items:
item_price = self.pricing_strategy.calculate_price(product, self.customer, quantity)
total += item_price
return total
def show_receipt(self):
"""Mostrar recibo detallado"""
print(f"\n{'='*60}")
print(f"Cliente: {self.customer.name} ({self.customer.loyalty_level.upper()})")
print(f"Estrategia: {self.pricing_strategy.get_description()}")
print(f"{'='*60}")
total = 0
for product, quantity in self.items:
base_total = product.base_price * quantity
final_price = self.pricing_strategy.calculate_price(product, self.customer, quantity)
discount = base_total - final_price
print(f"{product.name:30} x{quantity:3}")
print(f" Base: ${base_total:8.2f} | Final: ${final_price:8.2f} | Ahorro: ${discount:8.2f}")
total += final_price
print(f"{'='*60}")
print(f"TOTAL: ${total:.2f}")
print(f"{'='*60}\n")
# Demostración
laptop = Product("Laptop Pro", 1200.00, "electronics")
mouse = Product("Wireless Mouse", 25.00, "accessories")
keyboard = Product("Mechanical Keyboard", 80.00, "accessories")
customer_gold = Customer("Alice", "gold", date(2020, 1, 15))
# Crear carrito con estrategia inicial
cart = ShoppingCart(customer_gold, RegularPricing())
cart.add_item(laptop, 1)
cart.add_item(mouse, 50)
cart.add_item(keyboard, 2)
# Probar diferentes estrategias
cart.show_receipt()
cart.set_pricing_strategy(LoyaltyPricing())
cart.show_receipt()
cart.set_pricing_strategy(BulkPricing())
cart.show_receipt()
# Usar combo strategy para mejor precio
combo = ComboStrategy([
LoyaltyPricing(),
BulkPricing(),
SeasonalPricing()
])
cart.set_pricing_strategy(combo)
cart.show_receipt()
Cuándo usar Strategy:
- Múltiples algoritmos relacionados (sorting, compression, encryption)
- Eliminar condicionales complejos (if/else chains)
- Configuración en runtime (usuarios eligen comportamiento)
- A/B testing de diferentes implementaciones
Receta 2.5: Decorator - Extender funcionalidad dinámicamente
¿Qué es? Patrón que permite agregar comportamiento adicional a objetos de forma dinámica sin modificar su estructura original.
Caso de uso: Logging y métricas para funciones
import time
import functools
from typing import Callable, Any
import json
def timer(func: Callable) -> Callable:
"""Medir tiempo de ejecución"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"⏱️ {func.__name__} took {duration:.4f}s")
return result
return wrapper
def logger(func: Callable) -> Callable:
"""Registrar llamadas a función"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
args_repr = [repr(a) for a in args]
kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
print(f"📝 Calling {func.__name__}({signature})")
result = func(*args, **kwargs)
print(f"✓ {func.__name__} returned {result!r}")
return result
return wrapper
def cache(func: Callable) -> Callable:
"""Cachear resultados de función"""
cache_dict = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Crear key del cache
key = str(args) + str(kwargs)
if key in cache_dict:
print(f"💾 Cache HIT for {func.__name__}")
return cache_dict[key]
print(f"🔄 Cache MISS for {func.__name__}, computing...")
result = func(*args, **kwargs)
cache_dict[key] = result
return result
wrapper.cache_clear = lambda: cache_dict.clear()
return wrapper
def retry(max_attempts=3, delay=1):
"""Reintentar función en caso de error"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
print(f"❌ Failed after {max_attempts} attempts")
raise
print(f"⚠️ Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return wrapper
return decorator
def validate_args(**type_checks):
"""Validar tipos de argumentos"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Validar kwargs
for arg_name, expected_type in type_checks.items():
if arg_name in kwargs:
value = kwargs[arg_name]
if not isinstance(value, expected_type):
raise TypeError(
f"{func.__name__}: '{arg_name}' debe ser {expected_type.__name__}, "
f"recibido {type(value).__name__}"
)
return func(*args, **kwargs)
return wrapper
return decorator
# Ejemplo: Combinar múltiples decorators
@timer
@logger
@cache
def fibonacci(n: int) -> int:
"""Calcular fibonacci con decorators"""
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
@timer
@retry(max_attempts=3, delay=0.5)
def unreliable_api_call(fail_count=2):
"""Simular llamada API que falla aleatoriamente"""
import random
if random.random() < (fail_count / 3):
raise ConnectionError("API timeout")
return {"status": "success", "data": [1, 2, 3]}
@validate_args(amount=float, currency=str)
def process_payment(amount, currency, user_id=None):
"""Procesar pago con validación de tipos"""
print(f"💳 Processing ${amount} {currency} for user {user_id}")
return {"status": "completed"}
# Demostración
print("=== Fibonacci con cache ===")
fibonacci(5)
fibonacci(5) # Segunda llamada usa cache
fibonacci(6) # Reutiliza cálculos previos
print("\n=== API con retry ===")
try:
result = unreliable_api_call()
print(f"Result: {result}")
except Exception as e:
print(f"Final error: {e}")
print("\n=== Validación de argumentos ===")
process_payment(amount=99.99, currency="USD", user_id=12345)
try:
process_payment(amount="99.99", currency="USD") # Error: amount debe ser float
except TypeError as e:
print(f"Error: {e}")
Decorator como clase:
class RateLimiter:
"""Limitar tasa de llamadas a función"""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Limpiar llamadas antiguas
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
raise Exception(
f"Rate limit exceeded: {self.max_calls} calls per {self.time_window}s"
)
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
@RateLimiter(max_calls=3, time_window=5.0)
def api_endpoint(request_id):
print(f"✓ Processing request {request_id}")
return {"status": "ok"}
# Probar rate limiter
for i in range(5):
try:
api_endpoint(i)
time.sleep(1)
except Exception as e:
print(f"❌ Request {i}: {e}")
Aplicaciones reales:
- Python:
@property,@staticmethod,@classmethod - Web frameworks:
@login_required,@cache_page - Testing:
@mock,@patch - Async:
@async_to_sync
Lectura y Refactorización de Código
Receta 3.1: Code Smells - Identificar problemas
¿Qué son? Indicadores de que el código puede tener problemas de diseño, aunque técnicamente funcione.
Code Smells comunes y soluciones:
# ❌ CODE SMELL 1: Función muy larga (>20 líneas)
def process_order_bad(order):
# Validar orden
if not order.get('items'):
return {"error": "No items"}
if not order.get('customer'):
return {"error": "No customer"}
# Calcular total
total = 0
for item in order['items']:
price = item['price']
quantity = item['quantity']
discount = item.get('discount', 0)
total += price * quantity * (1 - discount)
# Aplicar shipping
if total < 50:
total += 10
# Aplicar impuestos
total *= 1.08
# Guardar en DB
# ... 10 líneas más
return {"total": total}
# ✅ SOLUCIÓN: Extraer métodos (Extract Method)
class OrderProcessor:
TAX_RATE = 0.08
SHIPPING_THRESHOLD = 50
SHIPPING_COST = 10
def process_order(self, order):
"""Punto de entrada claro y conciso"""
if error := self._validate_order(order):
return error
subtotal = self._calculate_subtotal(order['items'])
total = self._apply_fees(subtotal)
return {"total": total}
def _validate_order(self, order):
"""Validación separada"""
if not order.get('items'):
return {"error": "No items"}
if not order.get('customer'):
return {"error": "No customer"}
return None
def _calculate_subtotal(self, items):
"""Cálculo de subtotal aislado"""
return sum(
item['price'] * item['quantity'] * (1 - item.get('discount', 0))
for item in items
)
def _apply_fees(self, subtotal):
"""Aplicar shipping e impuestos"""
with_shipping = self._add_shipping(subtotal)
return with_shipping * (1 + self.TAX_RATE)
def _add_shipping(self, subtotal):
"""Lógica de shipping separada"""
return subtotal + self.SHIPPING_COST if subtotal < self.SHIPPING_THRESHOLD else subtotal
# ❌ CODE SMELL 2: Parámetros largos
def create_user_bad(name, email, age, address, phone, country, city, zip_code, preferences):
pass
# ✅ SOLUCIÓN: Objeto de parámetros
from dataclasses import dataclass
@dataclass
class UserData:
name: str
email: str
age: int
address: str
phone: str
country: str
city: str
zip_code: str
preferences: dict
def create_user_good(user_data: UserData):
# Mucho más legible y extensible
pass
# ❌ CODE SMELL 3: Números mágicos
def calculate_discount_bad(price, customer_type):
if customer_type == 1:
return price * 0.9
elif customer_type == 2:
return price * 0.85
elif customer_type == 3:
return price * 0.75
return price
# ✅ SOLUCIÓN: Constantes con nombres descriptivos
from enum import Enum
class CustomerType(Enum):
REGULAR = 1
PREMIUM = 2
VIP = 3
DISCOUNT_RATES = {
CustomerType.REGULAR: 0.10,
CustomerType.PREMIUM: 0.15,
CustomerType.VIP: 0.25,
}
def calculate_discount_good(price: float, customer_type: CustomerType) -> float:
discount_rate = DISCOUNT_RATES.get(customer_type, 0)
return price * (1 - discount_rate)
# ❌ CODE SMELL 4: Código duplicado
def send_email_to_customer(customer):
email = customer['email']
subject = "Thank you"
body = "Thanks for your purchase"
# Código de envío...
pass
def send_email_to_supplier(supplier):
email = supplier['email']
subject = "New order"
body = "You have a new order"
# Mismo código de envío...
pass
# ✅ SOLUCIÓN: DRY (Don't Repeat Yourself)
def send_email(recipient_email: str, subject: str, body: str):
"""Función reutilizable para enviar emails"""
# Código de envío centralizado
print(f"Sending to {recipient_email}: {subject}")
def send_email_to_customer(customer):
send_email(
customer['email'],
"Thank you",
"Thanks for your purchase"
)
def send_email_to_supplier(supplier):
send_email(
supplier['email'],
"New order",
"You have a new order"
)
# ❌ CODE SMELL 5: Comentarios que explican QUÉ hace el código
def process_bad(data):
# Crear una lista vacía
result = []
# Iterar sobre cada elemento
for item in data:
# Multiplicar por 2
doubled = item * 2
# Agregar a la lista
result.append(doubled)
return result
# ✅ SOLUCIÓN: Código auto-documentado (los nombres explican el QUÉ)
def double_all_values(numbers):
"""Los comentarios explican el POR QUÉ, no el QUÉ"""
return [number * 2 for number in numbers]
Lista de verificación de Code Smells:
- ❌ Funciones >20 líneas → Extraer métodos
- ❌ Más de 3 parámetros → Objeto de parámetros
- ❌ Números mágicos → Constantes nombradas
- ❌ Código duplicado → DRY principle
- ❌ Comentarios obvios → Nombres descriptivos
- ❌ Clase “Dios” (hace todo) → Separar responsabilidades
- ❌ Feature Envy (usa más datos de otra clase) → Mover lógica
Receta 3.2: Principios SOLID
S - Single Responsibility Principle
# ❌ VIOLACIÓN: Clase hace demasiado
class UserManager:
def create_user(self, data):
# Validar
# Guardar en DB
# Enviar email
# Registrar log
pass
# ✅ CORRECTO: Una responsabilidad por clase
class UserValidator:
def validate(self, user_data):
if not user_data.get('email'):
raise ValueError("Email required")
return True
class UserRepository:
def save(self, user):
# Solo interacción con DB
print(f"Saving user: {user.email}")
return user
class EmailService:
def send_welcome_email(self, user):
# Solo envío de emails
print(f"Sending email to: {user.email}")
class UserService:
"""Orquesta otros servicios"""
def __init__(self):
self.validator = UserValidator()
self.repository = UserRepository()
self.email_service = EmailService()
def create_user(self, user_data):
self.validator.validate(user_data)
user = self.repository.save(user_data)
self.email_service.send_welcome_email(user)
return user
O - Open/Closed Principle
# ❌ VIOLACIÓN: Modificar clase existente para nuevos tipos
class AreaCalculator:
def calculate(self, shape):
if shape.type == 'circle':
return 3.14 * shape.radius ** 2
elif shape.type == 'rectangle':
return shape.width * shape.height
# Agregar nuevo tipo requiere modificar esta clase
# ✅ CORRECTO: Abierto para extensión, cerrado para modificación
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self):
pass
class Circle(Shape):
def __init__(self, radius):
self.radius = radius
def area(self):
return 3.14 * self.radius ** 2
class Rectangle(Shape):
def __init__(self, width, height):
self.width = width
self.height = height
def area(self):
return self.width * self.height
class Triangle(Shape): # Nueva forma sin modificar código existente
def __init__(self, base, height):
self.base = base
self.height = height
def area(self):
return 0.5 * self.base * self.height
def calculate_total_area(shapes):
return sum(shape.area() for shape in shapes)
L - Liskov Substitution Principle
# ❌ VIOLACIÓN: Subclase cambia comportamiento esperado
class Bird:
def fly(self):
return "Flying"
class Penguin(Bird):
def fly(self):
raise Exception("Can't fly!") # Rompe contrato
# ✅ CORRECTO: Jerarquía apropiada
class Bird:
def move(self):
pass
class FlyingBird(Bird):
def move(self):
return "Flying"
def fly(self):
return "Flying"
class Penguin(Bird):
def move(self):
return "Swimming"
def swim(self):
return "Swimming"
def make_bird_move(bird: Bird):
print(bird.move()) # Funciona con cualquier Bird
make_bird_move(FlyingBird())
make_bird_move(Penguin())
I - Interface Segregation Principle
# ❌ VIOLACIÓN: Interfaz muy amplia fuerza implementaciones innecesarias
class Worker:
def work(self):
pass
def eat(self):
pass
def sleep(self):
pass
class Robot(Worker):
def work(self):
return "Working"
def eat(self):
pass # Robots no comen, método innecesario
def sleep(self):
pass # Robots no duermen
# ✅ CORRECTO: Interfaces pequeñas y específicas
class Workable:
def work(self):
pass
class Eatable:
def eat(self):
pass
class Sleepable:
def sleep(self):
pass
class Human(Workable, Eatable, Sleepable):
def work(self):
return "Working"
def eat(self):
return "Eating"
def sleep(self):
return "Sleeping"
class Robot(Workable):
def work(self):
return "Working 24/7"
D - Dependency Inversion Principle
# ❌ VIOLACIÓN: Alto nivel depende de bajo nivel
class MySQLDatabase:
def save(self, data):
print("Saving to MySQL")
class UserService:
def __init__(self):
self.db = MySQLDatabase() # Acoplado a implementación concreta
def save_user(self, user):
self.db.save(user)
# ✅ CORRECTO: Ambos dependen de abstracción
class Database(ABC):
@abstractmethod
def save(self, data):
pass
class MySQLDatabase(Database):
def save(self, data):
print(f"Saving to MySQL: {data}")
class PostgreSQLDatabase(Database):
def save(self, data):
print(f"Saving to PostgreSQL: {data}")
class MongoDatabase(Database):
def save(self, data):
print(f"Saving to MongoDB: {data}")
class UserService:
def __init__(self, database: Database): # Depende de abstracción
self.db = database
def save_user(self, user):
self.db.save(user)
# Uso: fácil cambiar implementación
mysql_service = UserService(MySQLDatabase())
postgres_service = UserService(PostgreSQLDatabase())
mongo_service = UserService(MongoDatabase())
Sistemas Operativos y Redes
Receta 4.1: Procesos vs Hilos vs Async
¿Cuál usar cuándo?
| Característica | Procesos | Hilos | Async/Await |
|---|---|---|---|
| Aislamiento | ✅ Total | ❌ Compartido | ❌ Compartido |
| Overhead | 🔴 Alto | 🟡 Medio | 🟢 Bajo |
| Comunicación | IPC complejo | Memoria compartida | Event loop |
| Mejor para | CPU-intensive | I/O + CPU | I/O-intensive |
| GIL (Python) | ✅ Evita | ❌ Afectado | ✅ Evita |
Ejemplo práctico: Descargar múltiples archivos
import time
import asyncio
import aiohttp
from multiprocessing import Process, Pool
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Simular descarga
def download_file_sync(url, file_id):
"""Descarga sincrónica (bloqueante)"""
print(f"Downloading {url}...")
time.sleep(2) # Simular I/O
return f"Downloaded {file_id}"
# 1. SECUENCIAL (baseline)
def download_sequential(urls):
start = time.time()
results = [download_file_sync(url, i) for i, url in enumerate(urls)]
print(f"⏱️ Sequential: {time.time() - start:.2f}s")
return results
# 2. MULTIPROCESSING (CPU-bound)
def download_multiprocess(urls):
start = time.time()
with ProcessPoolExecutor() as executor:
results = list(executor.map(download_file_sync, urls, range(len(urls))))
print(f"⏱️ Multiprocess: {time.time() - start:.2f}s")
return results
# 3. MULTITHREADING (I/O-bound)
def download_multithread(urls):
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(download_file_sync, urls, range(len(urls))))
print(f"⏱️ Multithread: {time.time() - start:.2f}s")
return results
# 4. ASYNC/AWAIT (I/O-bound óptimo)
async def download_file_async(session, url, file_id):
"""Descarga asíncrona (non-blocking)"""
print(f"Downloading {url}...")
await asyncio.sleep(2) # Simular I/O async
return f"Downloaded {file_id}"
async def download_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [download_file_async(session, url, i) for i, url in enumerate(urls)]
results = await asyncio.gather(*tasks)
print(f"⏱️ Async: {time.time() - start:.2f}s")
return results
# Prueba
urls = [f"https://example.com/file{i}.zip" for i in range(10)]
download_sequential(urls) # ~20s (10 archivos × 2s)
download_multiprocess(urls) # ~4-6s (overhead de procesos)
download_multithread(urls) # ~4s (paralelismo real)
asyncio.run(download_async(urls)) # ~2s (todo concurrente)
Caso de uso real: Web scraper
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
"""Scraper asíncrono eficiente"""
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url):
"""Descargar una página"""
async with self.semaphore: # Limitar concurrencia
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
"""Scrapear múltiples URLs concurrentemente"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def scrape_and_parse(self, urls):
"""Scrape + parseo"""
pages = await self.scrape_urls(urls)
parsed_data = []
for url, html in zip(urls, pages):
if html:
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title').text if soup.find('title') else "No title"
parsed_data.append({
'url': url,
'title': title,
'length': len(html)
})
return parsed_data
# Uso
scraper = AsyncWebScraper(max_concurrent=20)
urls = ["https://example.com", "https://python.org"]
# results = asyncio.run(scraper.scrape_and_parse(urls))
Cuándo usar cada enfoque:
- Secuencial: Pocas operaciones, prototipos, debugging
- Multiprocessing: CPU-intensive (cálculos matemáticos, machine learning)
- Multithreading: I/O-bound con librerías que liberan GIL (requests, file I/O)
- Async/Await: I/O-bound puro (muchas conexiones de red, WebSockets)
Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?
Comparación:
| Característica | TCP | UDP |
|---|---|---|
| Confiabilidad | ✅ Garantizada | ❌ Best effort |
| Orden | ✅ Ordenado | ❌ Puede desordenarse |
| Overhead | 🔴 Alto | 🟢 Bajo |
| Velocidad | 🟡 Media | ✅ Alta |
| Uso de conexión | Orientado a conexión | Sin conexión |
| Casos de uso | HTTP, FTP, Email | DNS, Streaming, Gaming |
Ejemplo: Chat en tiempo real
import socket
import threading
import json
from datetime import datetime
# TCP Chat Server (confiable, ordenado)
class TCPChatServer:
def __init__(self, host='localhost', port=5000):
self.host = host
self.port = port
self.clients = []
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def start(self):
"""Iniciar servidor TCP"""
self.socket.bind((self.host, self.port))
self.socket.listen(5)
print(f"🟢 TCP Server listening on {self.host}:{self.port}")
while True:
client_socket, address = self.socket.accept()
print(f"✓ New connection from {address}")
self.clients.append(client_socket)
# Manejar cliente en thread separado
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
thread.start()
def handle_client(self, client_socket, address):
"""Manejar mensajes de cliente"""
while True:
try:
message = client_socket.recv(1024).decode('utf-8')
if not message:
break
print(f"📨 From {address}: {message}")
# Broadcast a todos los clientes
self.broadcast(message, client_socket)
except Exception as e:
print(f"❌ Error: {e}")
break
client_socket.close()
self.clients.remove(client_socket)
print(f"✗ {address} disconnected")
def broadcast(self, message, sender_socket):
"""Enviar mensaje a todos excepto sender"""
for client in self.clients:
if client != sender_socket:
try:
client.send(message.encode('utf-8'))
except:
client.close()
self.clients.remove(client)
# UDP Game Server (rápido, puede perder paquetes)
class UDPGameServer:
"""Servidor UDP para posiciones de jugadores en tiempo real"""
def __init__(self, host='localhost', port=5001):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((self.host, self.port))
self.players = {} # {address: player_data}
def start(self):
"""Iniciar servidor UDP"""
print(f"🟢 UDP Server listening on {self.host}:{self.port}")
while True:
try:
data, address = self.socket.recvfrom(1024)
message = json.loads(data.decode('utf-8'))
# Actualizar posición del jugador
self.players[address] = {
'position': message.get('position'),
'timestamp': datetime.now().isoformat()
}
# Enviar estado del juego a todos (broadcast)
self.broadcast_game_state()
except Exception as e:
print(f"❌ Error: {e}")
def broadcast_game_state(self):
"""Enviar posiciones de todos los jugadores"""
game_state = {
'players': [
{
'address': str(addr),
'position': data['position']
}
for addr, data in self.players.items()
]
}
message = json.dumps(game_state).encode('utf-8')
for address in self.players.keys():
self.socket.sendto(message, address)
# Cliente TCP
def tcp_client(host='localhost', port=5000):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
while True:
message = input("You: ")
client.send(message.encode('utf-8'))
response = client.recv(1024).decode('utf-8')
print(f"Received: {response}")
# Cliente UDP (enviar posición cada frame)
def udp_game_client(host='localhost', port=5001):
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
position = [0, 0] # x, y
while True:
# Simular movimiento
position[0] += 1
position[1] += 0.5
message = json.dumps({'position': position})
client.sendto(message.encode('utf-8'), (host, port))
# Recibir estado del juego
data, _ = client.recvfrom(4096)
game_state = json.loads(data.decode('utf-8'))
print(f"Players: {len(game_state['players'])}")
time.sleep(0.016) # ~60 FPS
Cuándo usar cada protocolo:
TCP:
- APIs REST (HTTP/HTTPS)
- Transferencia de archivos
- Emails (SMTP, IMAP)
- Bases de datos
- Chat donde cada mensaje importa
UDP:
- Video/Audio streaming
- Gaming online (posiciones de jugadores)
- DNS queries
- VoIP
- Broadcasting/Multicasting
- Telemetría IoT (cuando pérdida ocasional es aceptable)
Receta 4.3: HTTP/HTTPS - Fundamentos Web
Métodos HTTP y casos de uso:
import requests
from flask import Flask, request, jsonify
app = Flask(__name__)
# Base de datos simulada
users = {}
# GET - Obtener recursos (idempotente, cacheable)
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""Obtener usuario por ID"""
user = users.get(user_id)
if user:
return jsonify(user), 200
return jsonify({"error": "User not found"}), 404
# POST - Crear nuevo recurso (no idempotente)
@app.route('/users', methods=['POST'])
def create_user():
"""Crear nuevo usuario"""
data = request.json
user_id = len(users) + 1
users[user_id] = {
'id': user_id,
'name': data['name'],
'email': data['email']
}
return jsonify(users[user_id]), 201 # 201 Created
# PUT - Actualizar completamente (idempotente)
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""Reemplazar usuario completamente"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
data = request.json
users[user_id] = {
'id': user_id,
'name': data['name'],
'email': data['email']
}
return jsonify(users[user_id]), 200
# PATCH - Actualizar parcialmente (idempotente)
@app.route('/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
"""Actualizar campos específicos"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
data = request.json
users[user_id].update(data) # Solo actualizar campos provistos
return jsonify(users[user_id]), 200
# DELETE - Eliminar recurso (idempotente)
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""Eliminar usuario"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
del users[user_id]
return '', 204 # 204 No Content
# HEAD - Como GET pero solo headers (para verificar existencia)
@app.route('/users/<int:user_id>', methods=['HEAD'])
def head_user(user_id):
"""Verificar si usuario existe sin traer body"""
if user_id in users:
return '', 200
return '', 404
# OPTIONS - Descubrir métodos soportados
@app.route('/users/<int:user_id>', methods=['OPTIONS'])
def options_user(user_id):
"""Retornar métodos HTTP permitidos"""
return '', 200, {'Allow': 'GET, PUT, PATCH, DELETE, HEAD, OPTIONS'}
if __name__ == '__main__':
app.run(debug=True)
Cliente HTTP con manejo de errores:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class RobustHTTPClient:
"""Cliente HTTP con retry automático y timeout"""
def __init__(self, base_url, timeout=10, max_retries=3):
self.base_url = base_url
self.timeout = timeout
# Configurar retry strategy
retry_strategy = Retry(
total=max_retries,
backoff_factor=1, # Esperar 1s, 2s, 4s...
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session = requests.Session()
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, endpoint, **kwargs):
"""GET request con manejo de errores"""
url = f"{self.base_url}{endpoint}"
try:
response = self.session.get(url, timeout=self.timeout, **kwargs)
response.raise_for_status() # Lanzar excepción si 4xx/5xx
return response.json()
except requests.exceptions.Timeout:
print(f"⏱️ Timeout al conectar a {url}")
raise
except requests.exceptions.ConnectionError:
print(f"❌ Error de conexión a {url}")
raise
except requests.exceptions.HTTPError as e:
print(f"❌ HTTP Error: {e.response.status_code}")
raise
def post(self, endpoint, data, **kwargs):
"""POST request"""
url = f"{self.base_url}{endpoint}"
response = self.session.post(
url,
json=data,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
return response.json()
# Uso
client = RobustHTTPClient("https://api.example.com", timeout=5, max_retries=3)
try:
users = client.get("/users")
new_user = client.post("/users", {"name": "Alice", "email": "alice@example.com"})
except Exception as e:
print(f"Failed: {e}")
Headers importantes:
# Configurar headers comunes
headers = {
# Autenticación
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
# Content negotiation
'Content-Type': 'application/json',
'Accept': 'application/json',
# Caching
'Cache-Control': 'max-age=3600',
'ETag': '"33a64df551425fcc55e4d42a148795d9f25f89d4"',
# CORS
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE',
# Rate limiting
'X-RateLimit-Limit': '100',
'X-RateLimit-Remaining': '95',
# Security
'Strict-Transport-Security': 'max-age=31536000',
'X-Content-Type-Options': 'nosniff',
}
Status codes esenciales:
- 2xx Éxito: 200 OK, 201 Created, 204 No Content
- 3xx Redirección: 301 Moved Permanently, 302 Found, 304 Not Modified
- 4xx Client Error: 400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found, 429 Too Many Requests
- 5xx Server Error: 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable
¡Felicidades! 🎉
Has completado la FASE 1: Fundamentos Reforzados del roadmap del desarrollador del futuro.
Lo que has aprendido:
✅ Estructuras de datos y algoritmos fundamentales
✅ Patrones de diseño esenciales
✅ Principios de código limpio y refactorización
✅ Fundamentos de sistemas operativos y redes
Próximos pasos:
Continue con la FASE 2: Arquitectura y Diseño de Sistemas para aprender sobre:
- Diseño de software escalable
- Bases de datos avanzadas
- APIs y comunicación entre servicios
Recursos adicionales recomendados:
- 📖 “Clean Code” - Robert C. Martin
- 📖 “Design Patterns” - Gang of Four
- 📖 “Introduction to Algorithms” - CLRS
- 🎥 MIT OpenCourseWare - Algorithms
- 💻 LeetCode / HackerRank para práctica
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo