🧑‍💻 Developer Cookbook - FASE 1: Fundamentos Reforzados

Recetas prácticas para dominar los fundamentos técnicos del desarrollo de software


📚 Tabla de Contenidos

  1. Estructuras de Datos y Algoritmos
  2. Patrones de Diseño
  3. Lectura y Refactorización de Código
  4. Sistemas Operativos y Redes

Estructuras de Datos y Algoritmos

Receta 1.1: Big O Notation - Midiendo el Rendimiento

¿Qué es? Notación matemática que describe cómo crece el tiempo de ejecución o el uso de memoria de un algoritmo en relación con el tamaño de la entrada.

Casos de uso:

Complejidades comunes:

NotaciónNombreEjemplo
O(1)ConstanteAcceso a array por índice
O(log n)LogarítmicaBúsqueda binaria
O(n)LinealRecorrer un array
O(n log n)Lineal-logarítmicaMerge sort, Quick sort
O(n²)CuadráticaBubble sort (nested loops)
O(2ⁿ)ExponencialFibonacci recursivo ingenuo

Ejemplo práctico:

# ❌ MAL: O(n²) - Buscar duplicados con nested loops
def has_duplicates_slow(arr):
    for i in range(len(arr)):
        for j in range(i + 1, len(arr)):
            if arr[i] == arr[j]:
                return True
    return False

# ✅ BIEN: O(n) - Usar un set
def has_duplicates_fast(arr):
    seen = set()
    for num in arr:
        if num in seen:
            return True
        seen.add(num)
    return False

# Prueba con 10,000 elementos
import time
data = list(range(10000)) + [5000]

start = time.time()
has_duplicates_slow(data)
print(f"Slow: {time.time() - start:.4f}s")  # ~0.5s

start = time.time()
has_duplicates_fast(data)
print(f"Fast: {time.time() - start:.4f}s")  # ~0.001s

Cuándo importa:


Receta 1.2: Arrays vs Listas Enlazadas

¿Qué son?

Comparación de operaciones:

OperaciónArrayLista Enlazada
Acceso por índiceO(1)O(n)
Inserción al inicioO(n)O(1)
Inserción al finalO(1)*O(1)
BúsquedaO(n)O(n)
EliminaciónO(n)O(1)**

*Con capacidad dinámica puede ser O(n) en resize
**Si ya tienes el nodo

Caso de uso: Sistema de deshacer/rehacer

class Node:
    def __init__(self, action):
        self.action = action
        self.prev = None
        self.next = None

class UndoRedoManager:
    """Lista doblemente enlazada para deshacer/rehacer acciones"""
    def __init__(self):
        self.current = None
        self.head = None
    
    def do_action(self, action):
        """Ejecutar nueva acción"""
        node = Node(action)
        
        if self.current:
            # Eliminar cualquier acción "redo" disponible
            self.current.next = node
            node.prev = self.current
        else:
            self.head = node
        
        self.current = node
        action.execute()
    
    def undo(self):
        """Deshacer última acción - O(1)"""
        if not self.current:
            return "Nothing to undo"
        
        self.current.action.rollback()
        self.current = self.current.prev
        return "Undone"
    
    def redo(self):
        """Rehacer acción - O(1)"""
        if not self.current or not self.current.next:
            return "Nothing to redo"
        
        self.current = self.current.next
        self.current.action.execute()
        return "Redone"

# Ejemplo de uso
class TextAction:
    def __init__(self, text):
        self.text = text
        self.document = []
    
    def execute(self):
        self.document.append(self.text)
        print(f"Added: {self.text}")
    
    def rollback(self):
        self.document.pop()
        print(f"Removed: {self.text}")

manager = UndoRedoManager()
manager.do_action(TextAction("Hello"))
manager.do_action(TextAction("World"))
manager.undo()  # Removed: World
manager.redo()  # Added: World

Cuándo usar cada uno:


Receta 1.3: Stacks y Queues - LIFO vs FIFO

¿Qué son?

Caso de uso real: Sistema de procesamiento de tareas

from collections import deque
from datetime import datetime

class TaskQueue:
    """Queue para procesamiento FIFO de tareas"""
    def __init__(self):
        self.queue = deque()
    
    def enqueue(self, task):
        """Agregar tarea al final"""
        self.queue.append(task)
        print(f"📥 Enqueued: {task['name']}")
    
    def dequeue(self):
        """Procesar siguiente tarea (la más antigua)"""
        if not self.queue:
            return None
        task = self.queue.popleft()
        print(f"⚙️  Processing: {task['name']}")
        return task

class CallStack:
    """Stack para rastrear ejecución de funciones"""
    def __init__(self):
        self.stack = []
    
    def push(self, function_name):
        """Entrar a una función"""
        self.stack.append({
            'name': function_name,
            'timestamp': datetime.now()
        })
        print(f"→ Entering: {function_name}")
    
    def pop(self):
        """Salir de una función"""
        if not self.stack:
            return None
        func = self.stack.pop()
        duration = (datetime.now() - func['timestamp']).total_seconds()
        print(f"← Exiting: {func['name']} ({duration:.3f}s)")
        return func
    
    def peek(self):
        """Ver función actual sin removerla"""
        return self.stack[-1] if self.stack else None

# Ejemplo 1: Task Queue (FIFO)
print("=== QUEUE Example ===")
task_queue = TaskQueue()
task_queue.enqueue({'name': 'Send email', 'priority': 1})
task_queue.enqueue({'name': 'Generate report', 'priority': 2})
task_queue.enqueue({'name': 'Backup database', 'priority': 3})

task_queue.dequeue()  # Procesa "Send email" primero
task_queue.dequeue()  # Luego "Generate report"

# Ejemplo 2: Call Stack (LIFO)
print("\n=== STACK Example ===")
call_stack = CallStack()

def function_a():
    call_stack.push('function_a')
    function_b()
    call_stack.pop()

def function_b():
    call_stack.push('function_b')
    function_c()
    call_stack.pop()

def function_c():
    call_stack.push('function_c')
    # Hacer algo
    call_stack.pop()

function_a()
# Output muestra el orden LIFO de ejecución

Aplicaciones reales:


Receta 1.4: Árboles Binarios de Búsqueda (BST)

¿Qué es? Estructura jerárquica donde cada nodo tiene máximo 2 hijos, y para cada nodo: todos los valores a la izquierda son menores, todos a la derecha son mayores.

Ventajas:

Caso de uso: Sistema de autocompletado

class TreeNode:
    def __init__(self, word, frequency=1):
        self.word = word
        self.frequency = frequency
        self.left = None
        self.right = None

class AutocompleteSystem:
    """BST para sugerencias de búsqueda"""
    def __init__(self):
        self.root = None
    
    def insert(self, word):
        """Agregar palabra al diccionario"""
        if not self.root:
            self.root = TreeNode(word)
            return
        
        current = self.root
        while True:
            if word < current.word:
                if current.left is None:
                    current.left = TreeNode(word)
                    break
                current = current.left
            elif word > current.word:
                if current.right is None:
                    current.right = TreeNode(word)
                    break
                current = current.right
            else:
                # Palabra ya existe, incrementar frecuencia
                current.frequency += 1
                break
    
    def search_prefix(self, prefix):
        """Encontrar todas las palabras con el prefijo"""
        results = []
        
        def traverse(node):
            if not node:
                return
            
            if node.word.startswith(prefix):
                results.append((node.word, node.frequency))
            
            # Solo explorar ramas relevantes
            if prefix <= node.word:
                traverse(node.left)
            if prefix >= node.word[:len(prefix)]:
                traverse(node.right)
        
        traverse(self.root)
        # Ordenar por frecuencia (más populares primero)
        return sorted(results, key=lambda x: x[1], reverse=True)

# Ejemplo de uso
autocomplete = AutocompleteSystem()

# Insertar palabras con sus búsquedas
searches = ['python', 'javascript', 'java', 'python', 'php', 
            'python', 'javascript', 'java', 'ruby', 'python']

for search in searches:
    autocomplete.insert(search)

# Autocompletar "py"
suggestions = autocomplete.search_prefix('py')
print("Suggestions for 'py':", suggestions)
# Output: [('python', 4), ('php', 1)]

# Autocompletar "ja"
suggestions = autocomplete.search_prefix('ja')
print("Suggestions for 'ja':", suggestions)
# Output: [('java', 2), ('javascript', 2)]

Cuándo NO usar BST:


Receta 1.5: Hash Tables - El poder del O(1)

¿Qué es? Estructura que mapea keys a values usando una función hash, permitiendo búsqueda, inserción y eliminación en tiempo constante promedio.

Caso de uso: Cache de resultados costosos

import time
import hashlib

class CacheWithExpiry:
    """Hash table con expiración de entradas"""
    def __init__(self, ttl_seconds=60):
        self.cache = {}
        self.ttl = ttl_seconds
        self.hits = 0
        self.misses = 0
    
    def _hash_key(self, key):
        """Generar hash consistente para cualquier tipo de key"""
        return hashlib.md5(str(key).encode()).hexdigest()
    
    def get(self, key):
        """Obtener valor del cache"""
        hashed_key = self._hash_key(key)
        
        if hashed_key in self.cache:
            entry = self.cache[hashed_key]
            # Verificar si expiró
            if time.time() - entry['timestamp'] < self.ttl:
                self.hits += 1
                print(f"✓ Cache HIT for {key}")
                return entry['value']
            else:
                # Expiró, eliminar
                del self.cache[hashed_key]
        
        self.misses += 1
        print(f"✗ Cache MISS for {key}")
        return None
    
    def set(self, key, value):
        """Guardar en cache"""
        hashed_key = self._hash_key(key)
        self.cache[hashed_key] = {
            'value': value,
            'timestamp': time.time()
        }
        print(f"💾 Cached: {key}")
    
    def stats(self):
        """Estadísticas del cache"""
        total = self.hits + self.misses
        hit_rate = (self.hits / total * 100) if total > 0 else 0
        return f"Hits: {self.hits}, Misses: {self.misses}, Rate: {hit_rate:.1f}%"

# Simulación de función costosa
def expensive_computation(n):
    """Simula cálculo pesado (ej: consulta a DB)"""
    print(f"⏳ Computing fibonacci({n})...")
    time.sleep(0.5)  # Simular delay
    
    if n <= 1:
        return n
    
    a, b = 0, 1
    for _ in range(n - 1):
        a, b = b, a + b
    return b

# Usar cache
cache = CacheWithExpiry(ttl_seconds=5)

# Primera llamada - MISS
result = cache.get('fib_10')
if result is None:
    result = expensive_computation(10)
    cache.set('fib_10', result)

# Segunda llamada - HIT
result = cache.get('fib_10')

# Esperar expiración
print("\n⏰ Waiting for cache expiry...")
time.sleep(6)

# Tercera llamada - MISS (expiró)
result = cache.get('fib_10')
if result is None:
    result = expensive_computation(10)
    cache.set('fib_10', result)

print(f"\n📊 {cache.stats()}")

Colisiones y resolución:

class SimpleHashTable:
    """Hash table con chaining para manejar colisiones"""
    def __init__(self, size=10):
        self.size = size
        self.table = [[] for _ in range(size)]
    
    def _hash(self, key):
        """Simple hash function"""
        return sum(ord(c) for c in str(key)) % self.size
    
    def insert(self, key, value):
        """Insertar con manejo de colisiones"""
        index = self._hash(key)
        bucket = self.table[index]
        
        # Actualizar si existe
        for i, (k, v) in enumerate(bucket):
            if k == key:
                bucket[i] = (key, value)
                return
        
        # Agregar nuevo
        bucket.append((key, value))
        print(f"Inserted '{key}' at index {index} (bucket size: {len(bucket)})")
    
    def get(self, key):
        """Buscar valor"""
        index = self._hash(key)
        bucket = self.table[index]
        
        for k, v in bucket:
            if k == key:
                return v
        return None

# Demostración de colisiones
ht = SimpleHashTable(size=5)
ht.insert("cat", "🐱")  # hash = 3
ht.insert("act", "🎭")  # hash = 3 (colisión!)
ht.insert("dog", "🐶")  # hash = 2

print(ht.get("cat"))  # 🐱
print(ht.get("act"))  # 🎭

Aplicaciones reales:


Receta 1.6: Búsqueda Binaria - Dividir y Conquistar

¿Qué es? Algoritmo de búsqueda en arrays ordenados que divide el espacio de búsqueda a la mitad en cada paso (O(log n)).

Caso de uso: Búsqueda en logs ordenados por timestamp

from datetime import datetime, timedelta
import bisect

class LogSearcher:
    """Búsqueda eficiente en logs ordenados por tiempo"""
    def __init__(self):
        self.logs = []
        self.timestamps = []
    
    def add_log(self, timestamp, message):
        """Agregar log manteniendo orden"""
        # bisect.insort es búsqueda binaria + inserción
        bisect.insort(self.timestamps, timestamp)
        index = bisect.bisect_left(self.timestamps, timestamp)
        self.logs.insert(index, {'time': timestamp, 'msg': message})
    
    def find_logs_in_range(self, start_time, end_time):
        """Encontrar logs en rango de tiempo - O(log n)"""
        # Encontrar índices con búsqueda binaria
        start_idx = bisect.bisect_left(self.timestamps, start_time)
        end_idx = bisect.bisect_right(self.timestamps, end_time)
        
        return self.logs[start_idx:end_idx]
    
    def find_first_error_after(self, timestamp):
        """Encontrar primer error después de cierto tiempo"""
        idx = bisect.bisect_left(self.timestamps, timestamp)
        
        for i in range(idx, len(self.logs)):
            if 'ERROR' in self.logs[i]['msg']:
                return self.logs[i]
        return None

# Ejemplo de uso
searcher = LogSearcher()
base_time = datetime(2024, 1, 1, 10, 0, 0)

# Agregar logs
logs_data = [
    (0, "INFO: Server started"),
    (5, "INFO: Connection established"),
    (10, "WARNING: High memory usage"),
    (15, "ERROR: Connection timeout"),
    (20, "INFO: Retry successful"),
    (30, "ERROR: Database error"),
]

for offset, msg in logs_data:
    searcher.add_log(base_time + timedelta(seconds=offset), msg)

# Buscar logs en ventana de tiempo
start = base_time + timedelta(seconds=10)
end = base_time + timedelta(seconds=20)
results = searcher.find_logs_in_range(start, end)

print("Logs between 10s and 20s:")
for log in results:
    print(f"  {log['time'].strftime('%H:%M:%S')} - {log['msg']}")

# Encontrar primer error después de los 12 segundos
error = searcher.find_first_error_after(base_time + timedelta(seconds=12))
print(f"\nFirst error after 12s: {error['msg']}")

Implementación clásica:

def binary_search(arr, target):
    """Búsqueda binaria iterativa"""
    left, right = 0, len(arr) - 1
    
    while left <= right:
        mid = (left + right) // 2
        
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1
    
    return -1  # No encontrado

def binary_search_recursive(arr, target, left=0, right=None):
    """Búsqueda binaria recursiva"""
    if right is None:
        right = len(arr) - 1
    
    if left > right:
        return -1
    
    mid = (left + right) // 2
    
    if arr[mid] == target:
        return mid
    elif arr[mid] < target:
        return binary_search_recursive(arr, target, mid + 1, right)
    else:
        return binary_search_recursive(arr, target, left, mid - 1)

# Prueba
sorted_array = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
print(binary_search(sorted_array, 7))  # 3
print(binary_search(sorted_array, 8))  # -1

Variaciones útiles:


Receta 1.7: Algoritmos de Ordenamiento

Comparación rápida:

AlgoritmoTiempo PromedioPeor CasoEspacioEstable?
Bubble SortO(n²)O(n²)O(1)✅ Sí
Selection SortO(n²)O(n²)O(1)❌ No
Insertion SortO(n²)O(n²)O(1)✅ Sí
Merge SortO(n log n)O(n log n)O(n)✅ Sí
Quick SortO(n log n)O(n²)O(log n)❌ No
Heap SortO(n log n)O(n log n)O(1)❌ No

Caso de uso: Ordenamiento personalizado de productos

from dataclasses import dataclass
from typing import List
import random

@dataclass
class Product:
    name: str
    price: float
    rating: float
    sales: int
    
    def __repr__(self):
        return f"{self.name} (${self.price}, ⭐{self.rating}, 🛒{self.sales})"

def merge_sort_custom(arr: List[Product], key_func) -> List[Product]:
    """Merge Sort con función de comparación personalizada"""
    if len(arr) <= 1:
        return arr
    
    mid = len(arr) // 2
    left = merge_sort_custom(arr[:mid], key_func)
    right = merge_sort_custom(arr[mid:], key_func)
    
    return merge(left, right, key_func)

def merge(left, right, key_func):
    """Combinar dos arrays ordenados"""
    result = []
    i = j = 0
    
    while i < len(left) and j < len(right):
        if key_func(left[i]) <= key_func(right[j]):
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1
    
    result.extend(left[i:])
    result.extend(right[j:])
    return result

def quick_sort_custom(arr: List[Product], key_func) -> List[Product]:
    """Quick Sort con función personalizada"""
    if len(arr) <= 1:
        return arr
    
    pivot = arr[len(arr) // 2]
    pivot_value = key_func(pivot)
    
    left = [x for x in arr if key_func(x) < pivot_value]
    middle = [x for x in arr if key_func(x) == pivot_value]
    right = [x for x in arr if key_func(x) > pivot_value]
    
    return quick_sort_custom(left, key_func) + middle + quick_sort_custom(right, key_func)

# Crear productos de ejemplo
products = [
    Product("Laptop Pro", 1299.99, 4.8, 523),
    Product("Mouse Wireless", 29.99, 4.2, 1250),
    Product("Keyboard RGB", 79.99, 4.5, 892),
    Product("Monitor 4K", 449.99, 4.7, 234),
    Product("Webcam HD", 89.99, 3.9, 678),
]

# Ordenar por diferentes criterios
print("=== Ordenar por PRECIO (ascendente) ===")
sorted_price = merge_sort_custom(products, lambda p: p.price)
for p in sorted_price:
    print(p)

print("\n=== Ordenar por RATING (descendente) ===")
sorted_rating = quick_sort_custom(products, lambda p: -p.rating)
for p in sorted_rating:
    print(p)

print("\n=== Ordenar por POPULARIDAD (descendente) ===")
sorted_sales = merge_sort_custom(products, lambda p: -p.sales)
for p in sorted_sales:
    print(p)

print("\n=== Ordenar por SCORE COMPUESTO ===")
# Score: 40% rating, 30% sales, 30% precio inverso
def popularity_score(p):
    max_sales = max(prod.sales for prod in products)
    max_price = max(prod.price for prod in products)
    
    return (
        p.rating * 0.4 +
        (p.sales / max_sales) * 0.3 +
        (1 - p.price / max_price) * 0.3
    )

sorted_score = merge_sort_custom(products, lambda p: -popularity_score(p))
for p in sorted_score:
    print(f"{p} - Score: {popularity_score(p):.3f}")

Cuándo usar cada uno:


Receta 1.8: Programación Dinámica - Fibonacci Optimizado

¿Qué es? Técnica para resolver problemas complejos dividiéndolos en subproblemas más simples y almacenando resultados para evitar recálculos.

Problema clásico: Fibonacci

import time
from functools import lru_cache

# ❌ ENFOQUE INGENUO - O(2^n)
def fibonacci_naive(n):
    """Recursivo sin optimización - EXTREMADAMENTE LENTO"""
    if n <= 1:
        return n
    return fibonacci_naive(n - 1) + fibonacci_naive(n - 2)

# ✅ TOP-DOWN (Memoization) - O(n)
@lru_cache(maxsize=None)
def fibonacci_memo(n):
    """Recursivo con caché automático"""
    if n <= 1:
        return n
    return fibonacci_memo(n - 1) + fibonacci_memo(n - 2)

# ✅ BOTTOM-UP (Tabulation) - O(n), más eficiente en espacio
def fibonacci_dp(n):
    """Iterativo construyendo desde abajo"""
    if n <= 1:
        return n
    
    dp = [0] * (n + 1)
    dp[1] = 1
    
    for i in range(2, n + 1):
        dp[i] = dp[i-1] + dp[i-2]
    
    return dp[n]

# ✅ OPTIMIZACIÓN ESPACIAL - O(1) espacio
def fibonacci_optimized(n):
    """Solo mantenemos los últimos 2 valores"""
    if n <= 1:
        return n
    
    prev, curr = 0, 1
    for _ in range(2, n + 1):
        prev, curr = curr, prev + curr
    
    return curr

# Comparación de rendimiento
def benchmark(func, n, name):
    start = time.time()
    result = func(n)
    duration = time.time() - start
    print(f"{name:20} n={n}: {result:,} ({duration:.6f}s)")

print("=== Comparación de Performance ===")
for n in [10, 20, 30]:
    print(f"\nCalculando Fibonacci({n}):")
    if n <= 30:  # Naive es muy lento para n>30
        benchmark(fibonacci_naive, n, "Naive (recursivo)")
    benchmark(fibonacci_memo, n, "Memoization")
    benchmark(fibonacci_dp, n, "DP Tabulation")
    benchmark(fibonacci_optimized, n, "Optimizado")

fibonacci_memo.cache_clear()  # Limpiar cache entre pruebas

Caso de uso real: Problema de la mochila (Knapsack)

def knapsack_dp(weights, values, capacity):
    """
    Problema de la mochila: maximizar valor sin exceder capacidad.
    Usado en: optimización de recursos, presupuestos, scheduling
    """
    n = len(weights)
    # dp[i][w] = valor máximo con primeros i items y capacidad w
    dp = [[0] * (capacity + 1) for _ in range(n + 1)]
    
    for i in range(1, n + 1):
        for w in range(capacity + 1):
            # Opción 1: No incluir item i
            dp[i][w] = dp[i-1][w]
            
            # Opción 2: Incluir item i (si cabe)
            if weights[i-1] <= w:
                dp[i][w] = max(
                    dp[i][w],
                    dp[i-1][w - weights[i-1]] + values[i-1]
                )
    
    return dp[n][capacity]

# Ejemplo: Optimizar carga de un servidor
tasks = {
    "DB Backup": {"cpu": 2, "value": 10},      # 2 CPUs, prioridad 10
    "Analytics": {"cpu": 3, "value": 15},      # 3 CPUs, prioridad 15
    "API Server": {"cpu": 1, "value": 20},     # 1 CPU, prioridad 20
    "Cache Warming": {"cpu": 2, "value": 8},   # 2 CPUs, prioridad 8
    "Log Processing": {"cpu": 1, "value": 5},  # 1 CPU, prioridad 5
}

weights = [task["cpu"] for task in tasks.values()]
values = [task["value"] for task in tasks.values()]
max_cpu = 5  # Capacidad del servidor

max_value = knapsack_dp(weights, values, max_cpu)
print(f"Valor máximo optimizado: {max_value}")
print(f"Con capacidad de {max_cpu} CPUs")

Otros problemas comunes de DP:


Patrones de Diseño

Receta 2.1: Singleton - Una única instancia

¿Qué es? Patrón que garantiza que una clase tenga solo una instancia y proporciona un punto de acceso global a ella.

Caso de uso: Configuración global de aplicación

import threading
from typing import Any, Dict

class ConfigurationManager:
    """Singleton thread-safe para gestión de configuración"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                # Double-check locking
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        # Inicializar solo una vez
        if not self._initialized:
            self._config: Dict[str, Any] = {
                'database': {
                    'host': 'localhost',
                    'port': 5432,
                    'pool_size': 10
                },
                'cache': {
                    'ttl': 300,
                    'max_size': 1000
                },
                'api': {
                    'rate_limit': 100,
                    'timeout': 30
                }
            }
            self._initialized = True
    
    def get(self, key: str, default=None):
        """Obtener valor de configuración con notación dot"""
        keys = key.split('.')
        value = self._config
        
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
            else:
                return default
            
            if value is None:
                return default
        
        return value
    
    def set(self, key: str, value: Any):
        """Establecer valor de configuración"""
        keys = key.split('.')
        config = self._config
        
        for k in keys[:-1]:
            config = config.setdefault(k, {})
        
        config[keys[-1]] = value

# Demostración
config1 = ConfigurationManager()
config2 = ConfigurationManager()

print(f"Son la misma instancia: {config1 is config2}")  # True
print(f"ID config1: {id(config1)}")
print(f"ID config2: {id(config2)}")

# Configurar desde cualquier parte
config1.set('database.host', 'production.db.com')
config1.set('api.rate_limit', 500)

# Acceder desde cualquier parte
print(f"\nDatabase host: {config2.get('database.host')}")
print(f"API rate limit: {config2.get('api.rate_limit')}")
print(f"Cache TTL: {config2.get('cache.ttl')}")

# Simular uso en diferentes módulos
def module_a():
    config = ConfigurationManager()
    return config.get('database.pool_size')

def module_b():
    config = ConfigurationManager()
    config.set('database.pool_size', 20)

module_b()
print(f"\nPool size desde module_a: {module_a()}")  # 20

Alternativa moderna: Módulo como Singleton

# config.py
class _Config:
    def __init__(self):
        self.database_url = "postgresql://localhost/mydb"
        self.debug = True
        self.secret_key = "dev-secret-123"

# Crear instancia única a nivel de módulo
config = _Config()

# En otros archivos, simplemente:
# from config import config
# print(config.database_url)

Cuándo usar:

Cuándo NO usar:


Receta 2.2: Factory - Creación de objetos flexible

¿Qué es? Patrón que delega la creación de objetos a subclases o métodos especializados, permitiendo crear objetos sin especificar la clase exacta.

Caso de uso: Parsers de diferentes formatos

from abc import ABC, abstractmethod
import json
import yaml
import csv
from io import StringIO

class DataParser(ABC):
    """Interfaz base para parsers"""
    
    @abstractmethod
    def parse(self, data: str) -> dict:
        """Parsear datos al formato estándar"""
        pass
    
    @abstractmethod
    def serialize(self, data: dict) -> str:
        """Serializar datos del formato estándar"""
        pass

class JSONParser(DataParser):
    def parse(self, data: str) -> dict:
        return json.loads(data)
    
    def serialize(self, data: dict) -> str:
        return json.dumps(data, indent=2)

class YAMLParser(DataParser):
    def parse(self, data: str) -> dict:
        return yaml.safe_load(data)
    
    def serialize(self, data: dict) -> str:
        return yaml.dump(data, default_flow_style=False)

class CSVParser(DataParser):
    def parse(self, data: str) -> dict:
        """Convertir CSV a dict con primera fila como keys"""
        reader = csv.DictReader(StringIO(data))
        return {"rows": list(reader)}
    
    def serialize(self, data: dict) -> str:
        """Convertir dict a CSV"""
        if not data.get("rows"):
            return ""
        
        output = StringIO()
        writer = csv.DictWriter(output, fieldnames=data["rows"][0].keys())
        writer.writeheader()
        writer.writerows(data["rows"])
        return output.getvalue()

class ParserFactory:
    """Factory para crear parsers según el tipo"""
    _parsers = {
        'json': JSONParser,
        'yaml': YAMLParser,
        'yml': YAMLParser,
        'csv': CSVParser,
    }
    
    @classmethod
    def create_parser(cls, format_type: str) -> DataParser:
        """Crear parser apropiado"""
        parser_class = cls._parsers.get(format_type.lower())
        
        if not parser_class:
            raise ValueError(f"Formato no soportado: {format_type}")
        
        return parser_class()
    
    @classmethod
    def register_parser(cls, format_type: str, parser_class):
        """Registrar nuevo tipo de parser (extensibilidad)"""
        cls._parsers[format_type] = parser_class

# Uso del factory
def process_data_file(filename: str, content: str):
    """Procesar archivo sin saber su formato de antemano"""
    # Detectar formato por extensión
    extension = filename.split('.')[-1]
    
    # Factory crea el parser apropiado
    parser = ParserFactory.create_parser(extension)
    
    # Usar el parser de forma polimórfica
    data = parser.parse(content)
    print(f"\n=== Procesando {filename} ===")
    print(f"Tipo de parser: {parser.__class__.__name__}")
    print(f"Datos parseados: {data}")
    
    # Serializar de vuelta
    serialized = parser.serialize(data)
    print(f"Serializado:\n{serialized[:100]}...")

# Ejemplos
json_content = '{"name": "John", "age": 30, "city": "NYC"}'
process_data_file("user.json", json_content)

yaml_content = """
name: Jane
age: 25
city: SF
"""
process_data_file("user.yaml", yaml_content)

csv_content = """name,age,city
Bob,35,LA
Alice,28,Chicago"""
process_data_file("users.csv", csv_content)

# Extensibilidad: Agregar nuevo formato
class XMLParser(DataParser):
    def parse(self, data: str) -> dict:
        # Implementación simplificada
        return {"xml": "parsed data"}
    
    def serialize(self, data: dict) -> str:
        return "<data>serialized</data>"

ParserFactory.register_parser('xml', XMLParser)
process_data_file("data.xml", "<root><item>test</item></root>")

Factory Method vs Abstract Factory:

# Factory Method: Una familia de productos
class NotificationFactory:
    @staticmethod
    def create(channel: str):
        if channel == 'email':
            return EmailNotification()
        elif channel == 'sms':
            return SMSNotification()
        elif channel == 'push':
            return PushNotification()

# Abstract Factory: Familias completas de productos relacionados
class CloudProviderFactory(ABC):
    @abstractmethod
    def create_compute(self):
        pass
    
    @abstractmethod
    def create_storage(self):
        pass

class AWSFactory(CloudProviderFactory):
    def create_compute(self):
        return EC2Instance()
    
    def create_storage(self):
        return S3Bucket()

class AzureFactory(CloudProviderFactory):
    def create_compute(self):
        return VirtualMachine()
    
    def create_storage(self):
        return BlobStorage()

Receta 2.3: Observer - Notificación de cambios

¿Qué es? Patrón que define una dependencia uno-a-muchos entre objetos, de modo que cuando un objeto cambia de estado, todos sus dependientes son notificados automáticamente.

Caso de uso: Sistema de eventos en una aplicación

from abc import ABC, abstractmethod
from typing import List, Any
from datetime import datetime

class Observer(ABC):
    """Interfaz para observadores"""
    
    @abstractmethod
    def update(self, event: str, data: Any):
        """Recibir notificación de cambio"""
        pass

class Subject:
    """Sujeto observable"""
    
    def __init__(self):
        self._observers: List[Observer] = []
    
    def attach(self, observer: Observer):
        """Registrar observador"""
        if observer not in self._observers:
            self._observers.append(observer)
            print(f"✓ {observer.__class__.__name__} attached")
    
    def detach(self, observer: Observer):
        """Eliminar observador"""
        self._observers.remove(observer)
        print(f"✗ {observer.__class__.__name__} detached")
    
    def notify(self, event: str, data: Any = None):
        """Notificar a todos los observadores"""
        print(f"\n📢 Notificando evento: {event}")
        for observer in self._observers:
            observer.update(event, data)

class UserAccount(Subject):
    """Cuenta de usuario observable"""
    
    def __init__(self, username: str):
        super().__init__()
        self.username = username
        self.balance = 0
    
    def deposit(self, amount: float):
        """Depositar dinero"""
        self.balance += amount
        self.notify('deposit', {
            'username': self.username,
            'amount': amount,
            'new_balance': self.balance,
            'timestamp': datetime.now()
        })
    
    def withdraw(self, amount: float):
        """Retirar dinero"""
        if amount > self.balance:
            self.notify('insufficient_funds', {
                'username': self.username,
                'attempted': amount,
                'available': self.balance
            })
            return False
        
        self.balance -= amount
        self.notify('withdrawal', {
            'username': self.username,
            'amount': amount,
            'new_balance': self.balance,
            'timestamp': datetime.now()
        })
        return True

# Observadores concretos
class EmailNotifier(Observer):
    """Envía emails en eventos importantes"""
    
    def update(self, event: str, data: Any):
        if event == 'deposit' and data['amount'] >= 1000:
            print(f"📧 EMAIL: Large deposit of ${data['amount']:.2f} detected for {data['username']}")
        elif event == 'insufficient_funds':
            print(f"📧 EMAIL: Insufficient funds alert for {data['username']}")

class SMSNotifier(Observer):
    """Envía SMS para retiros"""
    
    def update(self, event: str, data: Any):
        if event == 'withdrawal':
            print(f"📱 SMS: ${data['amount']:.2f} withdrawn from your account. New balance: ${data['new_balance']:.2f}")

class AuditLogger(Observer):
    """Registra todas las transacciones"""
    
    def __init__(self):
        self.logs = []
    
    def update(self, event: str, data: Any):
        log_entry = {
            'event': event,
            'data': data,
            'logged_at': datetime.now()
        }
        self.logs.append(log_entry)
        print(f"📝 AUDIT: Logged {event} event")

class FraudDetector(Observer):
    """Detecta actividad sospechosa"""
    
    def __init__(self):
        self.transaction_count = {}
    
    def update(self, event: str, data: Any):
        if event in ['deposit', 'withdrawal']:
            username = data['username']
            self.transaction_count[username] = self.transaction_count.get(username, 0) + 1
            
            if self.transaction_count[username] >= 3:
                print(f"🚨 FRAUD ALERT: Multiple transactions ({self.transaction_count[username]}) from {username}")

# Demostración
print("=== Sistema de Observadores ===")
account = UserAccount("john_doe")

# Registrar observadores
email_notifier = EmailNotifier()
sms_notifier = SMSNotifier()
audit_logger = AuditLogger()
fraud_detector = FraudDetector()

account.attach(email_notifier)
account.attach(sms_notifier)
account.attach(audit_logger)
account.attach(fraud_detector)

# Realizar transacciones
account.deposit(500)
account.deposit(1500)  # Triggerea email por cantidad grande
account.withdraw(300)
account.withdraw(100)
account.withdraw(50)   # Triggerea fraud alert
account.withdraw(5000) # Insufficient funds

# Desregistrar un observador
print("\n--- Removiendo SMS Notifier ---")
account.detach(sms_notifier)
account.deposit(200)  # Solo los otros observadores se notifican

# Ver logs de auditoría
print(f"\n📊 Total de logs de auditoría: {len(audit_logger.logs)}")

Aplicaciones reales:


Receta 2.4: Strategy - Algoritmos intercambiables

¿Qué es? Patrón que define una familia de algoritmos, encapsula cada uno y los hace intercambiables, permitiendo que el algoritmo varíe independientemente de los clientes que lo usan.

Caso de uso: Sistema de precios con diferentes estrategias

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, date
from typing import List

@dataclass
class Product:
    name: str
    base_price: float
    category: str

@dataclass
class Customer:
    name: str
    loyalty_level: str  # 'bronze', 'silver', 'gold', 'platinum'
    member_since: date

class PricingStrategy(ABC):
    """Estrategia base de pricing"""
    
    @abstractmethod
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        """Calcular precio final"""
        pass
    
    @abstractmethod
    def get_description(self) -> str:
        """Descripción de la estrategia"""
        pass

class RegularPricing(PricingStrategy):
    """Sin descuentos"""
    
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        return product.base_price * quantity
    
    def get_description(self) -> str:
        return "Precio regular sin descuentos"

class LoyaltyPricing(PricingStrategy):
    """Descuentos basados en nivel de lealtad"""
    
    DISCOUNTS = {
        'bronze': 0.05,    # 5%
        'silver': 0.10,    # 10%
        'gold': 0.15,      # 15%
        'platinum': 0.20   # 20%
    }
    
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        base_total = product.base_price * quantity
        discount = self.DISCOUNTS.get(customer.loyalty_level, 0)
        return base_total * (1 - discount)
    
    def get_description(self) -> str:
        return "Descuento por nivel de lealtad del cliente"

class BulkPricing(PricingStrategy):
    """Descuentos por volumen"""
    
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        base_total = product.base_price * quantity
        
        if quantity >= 100:
            discount = 0.20  # 20% para 100+
        elif quantity >= 50:
            discount = 0.15  # 15% para 50+
        elif quantity >= 20:
            discount = 0.10  # 10% para 20+
        else:
            discount = 0
        
        return base_total * (1 - discount)
    
    def get_description(self) -> str:
        return "Descuento por cantidad (20+ items: 10%, 50+: 15%, 100+: 20%)"

class SeasonalPricing(PricingStrategy):
    """Descuentos por temporada"""
    
    def __init__(self):
        self.current_month = datetime.now().month
        # Temporada alta: junio-agosto, temporada baja: diciembre-febrero
        self.high_season = [6, 7, 8]
        self.low_season = [12, 1, 2]
    
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        base_total = product.base_price * quantity
        
        if self.current_month in self.high_season:
            # Precio premium en temporada alta
            return base_total * 1.20
        elif self.current_month in self.low_season:
            # Descuento en temporada baja
            return base_total * 0.80
        else:
            return base_total
    
    def get_description(self) -> str:
        return "Ajuste de precios por temporada (+20% alta, -20% baja)"

class ComboStrategy(PricingStrategy):
    """Combina múltiples estrategias tomando el mejor precio"""
    
    def __init__(self, strategies: List[PricingStrategy]):
        self.strategies = strategies
    
    def calculate_price(self, product: Product, customer: Customer, quantity: int) -> float:
        # Aplicar todas las estrategias y tomar el precio más bajo
        prices = [
            strategy.calculate_price(product, customer, quantity)
            for strategy in self.strategies
        ]
        return min(prices)
    
    def get_description(self) -> str:
        strategy_names = [s.__class__.__name__ for s in self.strategies]
        return f"Mejor precio entre: {', '.join(strategy_names)}"

class ShoppingCart:
    """Carrito con estrategia de pricing intercambiable"""
    
    def __init__(self, customer: Customer, pricing_strategy: PricingStrategy):
        self.customer = customer
        self.pricing_strategy = pricing_strategy
        self.items: List[tuple] = []  # (product, quantity)
    
    def add_item(self, product: Product, quantity: int):
        """Agregar producto al carrito"""
        self.items.append((product, quantity))
    
    def set_pricing_strategy(self, strategy: PricingStrategy):
        """Cambiar estrategia de pricing"""
        self.pricing_strategy = strategy
        print(f"\n🔄 Estrategia cambiada a: {strategy.get_description()}")
    
    def calculate_total(self) -> float:
        """Calcular total con la estrategia actual"""
        total = 0
        for product, quantity in self.items:
            item_price = self.pricing_strategy.calculate_price(product, self.customer, quantity)
            total += item_price
        return total
    
    def show_receipt(self):
        """Mostrar recibo detallado"""
        print(f"\n{'='*60}")
        print(f"Cliente: {self.customer.name} ({self.customer.loyalty_level.upper()})")
        print(f"Estrategia: {self.pricing_strategy.get_description()}")
        print(f"{'='*60}")
        
        total = 0
        for product, quantity in self.items:
            base_total = product.base_price * quantity
            final_price = self.pricing_strategy.calculate_price(product, self.customer, quantity)
            discount = base_total - final_price
            
            print(f"{product.name:30} x{quantity:3}")
            print(f"  Base: ${base_total:8.2f} | Final: ${final_price:8.2f} | Ahorro: ${discount:8.2f}")
            
            total += final_price
        
        print(f"{'='*60}")
        print(f"TOTAL: ${total:.2f}")
        print(f"{'='*60}\n")

# Demostración
laptop = Product("Laptop Pro", 1200.00, "electronics")
mouse = Product("Wireless Mouse", 25.00, "accessories")
keyboard = Product("Mechanical Keyboard", 80.00, "accessories")

customer_gold = Customer("Alice", "gold", date(2020, 1, 15))

# Crear carrito con estrategia inicial
cart = ShoppingCart(customer_gold, RegularPricing())
cart.add_item(laptop, 1)
cart.add_item(mouse, 50)
cart.add_item(keyboard, 2)

# Probar diferentes estrategias
cart.show_receipt()

cart.set_pricing_strategy(LoyaltyPricing())
cart.show_receipt()

cart.set_pricing_strategy(BulkPricing())
cart.show_receipt()

# Usar combo strategy para mejor precio
combo = ComboStrategy([
    LoyaltyPricing(),
    BulkPricing(),
    SeasonalPricing()
])
cart.set_pricing_strategy(combo)
cart.show_receipt()

Cuándo usar Strategy:


Receta 2.5: Decorator - Extender funcionalidad dinámicamente

¿Qué es? Patrón que permite agregar comportamiento adicional a objetos de forma dinámica sin modificar su estructura original.

Caso de uso: Logging y métricas para funciones

import time
import functools
from typing import Callable, Any
import json

def timer(func: Callable) -> Callable:
    """Medir tiempo de ejecución"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        print(f"⏱️  {func.__name__} took {duration:.4f}s")
        return result
    return wrapper

def logger(func: Callable) -> Callable:
    """Registrar llamadas a función"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        args_repr = [repr(a) for a in args]
        kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]
        signature = ", ".join(args_repr + kwargs_repr)
        print(f"📝 Calling {func.__name__}({signature})")
        result = func(*args, **kwargs)
        print(f"✓ {func.__name__} returned {result!r}")
        return result
    return wrapper

def cache(func: Callable) -> Callable:
    """Cachear resultados de función"""
    cache_dict = {}
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Crear key del cache
        key = str(args) + str(kwargs)
        
        if key in cache_dict:
            print(f"💾 Cache HIT for {func.__name__}")
            return cache_dict[key]
        
        print(f"🔄 Cache MISS for {func.__name__}, computing...")
        result = func(*args, **kwargs)
        cache_dict[key] = result
        return result
    
    wrapper.cache_clear = lambda: cache_dict.clear()
    return wrapper

def retry(max_attempts=3, delay=1):
    """Reintentar función en caso de error"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"❌ Failed after {max_attempts} attempts")
                        raise
                    print(f"⚠️  Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
        return wrapper
    return decorator

def validate_args(**type_checks):
    """Validar tipos de argumentos"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Validar kwargs
            for arg_name, expected_type in type_checks.items():
                if arg_name in kwargs:
                    value = kwargs[arg_name]
                    if not isinstance(value, expected_type):
                        raise TypeError(
                            f"{func.__name__}: '{arg_name}' debe ser {expected_type.__name__}, "
                            f"recibido {type(value).__name__}"
                        )
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Ejemplo: Combinar múltiples decorators
@timer
@logger
@cache
def fibonacci(n: int) -> int:
    """Calcular fibonacci con decorators"""
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

@timer
@retry(max_attempts=3, delay=0.5)
def unreliable_api_call(fail_count=2):
    """Simular llamada API que falla aleatoriamente"""
    import random
    if random.random() < (fail_count / 3):
        raise ConnectionError("API timeout")
    return {"status": "success", "data": [1, 2, 3]}

@validate_args(amount=float, currency=str)
def process_payment(amount, currency, user_id=None):
    """Procesar pago con validación de tipos"""
    print(f"💳 Processing ${amount} {currency} for user {user_id}")
    return {"status": "completed"}

# Demostración
print("=== Fibonacci con cache ===")
fibonacci(5)
fibonacci(5)  # Segunda llamada usa cache
fibonacci(6)  # Reutiliza cálculos previos

print("\n=== API con retry ===")
try:
    result = unreliable_api_call()
    print(f"Result: {result}")
except Exception as e:
    print(f"Final error: {e}")

print("\n=== Validación de argumentos ===")
process_payment(amount=99.99, currency="USD", user_id=12345)

try:
    process_payment(amount="99.99", currency="USD")  # Error: amount debe ser float
except TypeError as e:
    print(f"Error: {e}")

Decorator como clase:

class RateLimiter:
    """Limitar tasa de llamadas a función"""
    
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
    
    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            
            # Limpiar llamadas antiguas
            self.calls = [call_time for call_time in self.calls 
                         if now - call_time < self.time_window]
            
            if len(self.calls) >= self.max_calls:
                raise Exception(
                    f"Rate limit exceeded: {self.max_calls} calls per {self.time_window}s"
                )
            
            self.calls.append(now)
            return func(*args, **kwargs)
        
        return wrapper

@RateLimiter(max_calls=3, time_window=5.0)
def api_endpoint(request_id):
    print(f"✓ Processing request {request_id}")
    return {"status": "ok"}

# Probar rate limiter
for i in range(5):
    try:
        api_endpoint(i)
        time.sleep(1)
    except Exception as e:
        print(f"❌ Request {i}: {e}")

Aplicaciones reales:


Lectura y Refactorización de Código

Receta 3.1: Code Smells - Identificar problemas

¿Qué son? Indicadores de que el código puede tener problemas de diseño, aunque técnicamente funcione.

Code Smells comunes y soluciones:

# ❌ CODE SMELL 1: Función muy larga (>20 líneas)
def process_order_bad(order):
    # Validar orden
    if not order.get('items'):
        return {"error": "No items"}
    
    if not order.get('customer'):
        return {"error": "No customer"}
    
    # Calcular total
    total = 0
    for item in order['items']:
        price = item['price']
        quantity = item['quantity']
        discount = item.get('discount', 0)
        total += price * quantity * (1 - discount)
    
    # Aplicar shipping
    if total < 50:
        total += 10
    
    # Aplicar impuestos
    total *= 1.08
    
    # Guardar en DB
    # ... 10 líneas más
    
    return {"total": total}

# ✅ SOLUCIÓN: Extraer métodos (Extract Method)
class OrderProcessor:
    TAX_RATE = 0.08
    SHIPPING_THRESHOLD = 50
    SHIPPING_COST = 10
    
    def process_order(self, order):
        """Punto de entrada claro y conciso"""
        if error := self._validate_order(order):
            return error
        
        subtotal = self._calculate_subtotal(order['items'])
        total = self._apply_fees(subtotal)
        
        return {"total": total}
    
    def _validate_order(self, order):
        """Validación separada"""
        if not order.get('items'):
            return {"error": "No items"}
        if not order.get('customer'):
            return {"error": "No customer"}
        return None
    
    def _calculate_subtotal(self, items):
        """Cálculo de subtotal aislado"""
        return sum(
            item['price'] * item['quantity'] * (1 - item.get('discount', 0))
            for item in items
        )
    
    def _apply_fees(self, subtotal):
        """Aplicar shipping e impuestos"""
        with_shipping = self._add_shipping(subtotal)
        return with_shipping * (1 + self.TAX_RATE)
    
    def _add_shipping(self, subtotal):
        """Lógica de shipping separada"""
        return subtotal + self.SHIPPING_COST if subtotal < self.SHIPPING_THRESHOLD else subtotal

# ❌ CODE SMELL 2: Parámetros largos
def create_user_bad(name, email, age, address, phone, country, city, zip_code, preferences):
    pass

# ✅ SOLUCIÓN: Objeto de parámetros
from dataclasses import dataclass

@dataclass
class UserData:
    name: str
    email: str
    age: int
    address: str
    phone: str
    country: str
    city: str
    zip_code: str
    preferences: dict

def create_user_good(user_data: UserData):
    # Mucho más legible y extensible
    pass

# ❌ CODE SMELL 3: Números mágicos
def calculate_discount_bad(price, customer_type):
    if customer_type == 1:
        return price * 0.9
    elif customer_type == 2:
        return price * 0.85
    elif customer_type == 3:
        return price * 0.75
    return price

# ✅ SOLUCIÓN: Constantes con nombres descriptivos
from enum import Enum

class CustomerType(Enum):
    REGULAR = 1
    PREMIUM = 2
    VIP = 3

DISCOUNT_RATES = {
    CustomerType.REGULAR: 0.10,
    CustomerType.PREMIUM: 0.15,
    CustomerType.VIP: 0.25,
}

def calculate_discount_good(price: float, customer_type: CustomerType) -> float:
    discount_rate = DISCOUNT_RATES.get(customer_type, 0)
    return price * (1 - discount_rate)

# ❌ CODE SMELL 4: Código duplicado
def send_email_to_customer(customer):
    email = customer['email']
    subject = "Thank you"
    body = "Thanks for your purchase"
    # Código de envío...
    pass

def send_email_to_supplier(supplier):
    email = supplier['email']
    subject = "New order"
    body = "You have a new order"
    # Mismo código de envío...
    pass

# ✅ SOLUCIÓN: DRY (Don't Repeat Yourself)
def send_email(recipient_email: str, subject: str, body: str):
    """Función reutilizable para enviar emails"""
    # Código de envío centralizado
    print(f"Sending to {recipient_email}: {subject}")

def send_email_to_customer(customer):
    send_email(
        customer['email'],
        "Thank you",
        "Thanks for your purchase"
    )

def send_email_to_supplier(supplier):
    send_email(
        supplier['email'],
        "New order",
        "You have a new order"
    )

# ❌ CODE SMELL 5: Comentarios que explican QUÉ hace el código
def process_bad(data):
    # Crear una lista vacía
    result = []
    
    # Iterar sobre cada elemento
    for item in data:
        # Multiplicar por 2
        doubled = item * 2
        # Agregar a la lista
        result.append(doubled)
    
    return result

# ✅ SOLUCIÓN: Código auto-documentado (los nombres explican el QUÉ)
def double_all_values(numbers):
    """Los comentarios explican el POR QUÉ, no el QUÉ"""
    return [number * 2 for number in numbers]

Lista de verificación de Code Smells:


Receta 3.2: Principios SOLID

S - Single Responsibility Principle

# ❌ VIOLACIÓN: Clase hace demasiado
class UserManager:
    def create_user(self, data):
        # Validar
        # Guardar en DB
        # Enviar email
        # Registrar log
        pass

# ✅ CORRECTO: Una responsabilidad por clase
class UserValidator:
    def validate(self, user_data):
        if not user_data.get('email'):
            raise ValueError("Email required")
        return True

class UserRepository:
    def save(self, user):
        # Solo interacción con DB
        print(f"Saving user: {user.email}")
        return user

class EmailService:
    def send_welcome_email(self, user):
        # Solo envío de emails
        print(f"Sending email to: {user.email}")

class UserService:
    """Orquesta otros servicios"""
    def __init__(self):
        self.validator = UserValidator()
        self.repository = UserRepository()
        self.email_service = EmailService()
    
    def create_user(self, user_data):
        self.validator.validate(user_data)
        user = self.repository.save(user_data)
        self.email_service.send_welcome_email(user)
        return user

O - Open/Closed Principle

# ❌ VIOLACIÓN: Modificar clase existente para nuevos tipos
class AreaCalculator:
    def calculate(self, shape):
        if shape.type == 'circle':
            return 3.14 * shape.radius ** 2
        elif shape.type == 'rectangle':
            return shape.width * shape.height
        # Agregar nuevo tipo requiere modificar esta clase

# ✅ CORRECTO: Abierto para extensión, cerrado para modificación
from abc import ABC, abstractmethod

class Shape(ABC):
    @abstractmethod
    def area(self):
        pass

class Circle(Shape):
    def __init__(self, radius):
        self.radius = radius
    
    def area(self):
        return 3.14 * self.radius ** 2

class Rectangle(Shape):
    def __init__(self, width, height):
        self.width = width
        self.height = height
    
    def area(self):
        return self.width * self.height

class Triangle(Shape):  # Nueva forma sin modificar código existente
    def __init__(self, base, height):
        self.base = base
        self.height = height
    
    def area(self):
        return 0.5 * self.base * self.height

def calculate_total_area(shapes):
    return sum(shape.area() for shape in shapes)

L - Liskov Substitution Principle

# ❌ VIOLACIÓN: Subclase cambia comportamiento esperado
class Bird:
    def fly(self):
        return "Flying"

class Penguin(Bird):
    def fly(self):
        raise Exception("Can't fly!")  # Rompe contrato

# ✅ CORRECTO: Jerarquía apropiada
class Bird:
    def move(self):
        pass

class FlyingBird(Bird):
    def move(self):
        return "Flying"
    
    def fly(self):
        return "Flying"

class Penguin(Bird):
    def move(self):
        return "Swimming"
    
    def swim(self):
        return "Swimming"

def make_bird_move(bird: Bird):
    print(bird.move())  # Funciona con cualquier Bird

make_bird_move(FlyingBird())
make_bird_move(Penguin())

I - Interface Segregation Principle

# ❌ VIOLACIÓN: Interfaz muy amplia fuerza implementaciones innecesarias
class Worker:
    def work(self):
        pass
    
    def eat(self):
        pass
    
    def sleep(self):
        pass

class Robot(Worker):
    def work(self):
        return "Working"
    
    def eat(self):
        pass  # Robots no comen, método innecesario
    
    def sleep(self):
        pass  # Robots no duermen

# ✅ CORRECTO: Interfaces pequeñas y específicas
class Workable:
    def work(self):
        pass

class Eatable:
    def eat(self):
        pass

class Sleepable:
    def sleep(self):
        pass

class Human(Workable, Eatable, Sleepable):
    def work(self):
        return "Working"
    
    def eat(self):
        return "Eating"
    
    def sleep(self):
        return "Sleeping"

class Robot(Workable):
    def work(self):
        return "Working 24/7"

D - Dependency Inversion Principle

# ❌ VIOLACIÓN: Alto nivel depende de bajo nivel
class MySQLDatabase:
    def save(self, data):
        print("Saving to MySQL")

class UserService:
    def __init__(self):
        self.db = MySQLDatabase()  # Acoplado a implementación concreta
    
    def save_user(self, user):
        self.db.save(user)

# ✅ CORRECTO: Ambos dependen de abstracción
class Database(ABC):
    @abstractmethod
    def save(self, data):
        pass

class MySQLDatabase(Database):
    def save(self, data):
        print(f"Saving to MySQL: {data}")

class PostgreSQLDatabase(Database):
    def save(self, data):
        print(f"Saving to PostgreSQL: {data}")

class MongoDatabase(Database):
    def save(self, data):
        print(f"Saving to MongoDB: {data}")

class UserService:
    def __init__(self, database: Database):  # Depende de abstracción
        self.db = database
    
    def save_user(self, user):
        self.db.save(user)

# Uso: fácil cambiar implementación
mysql_service = UserService(MySQLDatabase())
postgres_service = UserService(PostgreSQLDatabase())
mongo_service = UserService(MongoDatabase())

Sistemas Operativos y Redes

Receta 4.1: Procesos vs Hilos vs Async

¿Cuál usar cuándo?

CaracterísticaProcesosHilosAsync/Await
Aislamiento✅ Total❌ Compartido❌ Compartido
Overhead🔴 Alto🟡 Medio🟢 Bajo
ComunicaciónIPC complejoMemoria compartidaEvent loop
Mejor paraCPU-intensiveI/O + CPUI/O-intensive
GIL (Python)✅ Evita❌ Afectado✅ Evita

Ejemplo práctico: Descargar múltiples archivos

import time
import asyncio
import aiohttp
from multiprocessing import Process, Pool
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Simular descarga
def download_file_sync(url, file_id):
    """Descarga sincrónica (bloqueante)"""
    print(f"Downloading {url}...")
    time.sleep(2)  # Simular I/O
    return f"Downloaded {file_id}"

# 1. SECUENCIAL (baseline)
def download_sequential(urls):
    start = time.time()
    results = [download_file_sync(url, i) for i, url in enumerate(urls)]
    print(f"⏱️  Sequential: {time.time() - start:.2f}s")
    return results

# 2. MULTIPROCESSING (CPU-bound)
def download_multiprocess(urls):
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(download_file_sync, urls, range(len(urls))))
    print(f"⏱️  Multiprocess: {time.time() - start:.2f}s")
    return results

# 3. MULTITHREADING (I/O-bound)
def download_multithread(urls):
    start = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(download_file_sync, urls, range(len(urls))))
    print(f"⏱️  Multithread: {time.time() - start:.2f}s")
    return results

# 4. ASYNC/AWAIT (I/O-bound óptimo)
async def download_file_async(session, url, file_id):
    """Descarga asíncrona (non-blocking)"""
    print(f"Downloading {url}...")
    await asyncio.sleep(2)  # Simular I/O async
    return f"Downloaded {file_id}"

async def download_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [download_file_async(session, url, i) for i, url in enumerate(urls)]
        results = await asyncio.gather(*tasks)
    print(f"⏱️  Async: {time.time() - start:.2f}s")
    return results

# Prueba
urls = [f"https://example.com/file{i}.zip" for i in range(10)]

download_sequential(urls)      # ~20s (10 archivos × 2s)
download_multiprocess(urls)    # ~4-6s (overhead de procesos)
download_multithread(urls)     # ~4s (paralelismo real)
asyncio.run(download_async(urls))  # ~2s (todo concurrente)

Caso de uso real: Web scraper

import asyncio
import aiohttp
from bs4 import BeautifulSoup

class AsyncWebScraper:
    """Scraper asíncrono eficiente"""
    
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_page(self, session, url):
        """Descargar una página"""
        async with self.semaphore:  # Limitar concurrencia
            try:
                async with session.get(url, timeout=10) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
    
    async def scrape_urls(self, urls):
        """Scrapear múltiples URLs concurrentemente"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    async def scrape_and_parse(self, urls):
        """Scrape + parseo"""
        pages = await self.scrape_urls(urls)
        
        parsed_data = []
        for url, html in zip(urls, pages):
            if html:
                soup = BeautifulSoup(html, 'html.parser')
                title = soup.find('title').text if soup.find('title') else "No title"
                parsed_data.append({
                    'url': url,
                    'title': title,
                    'length': len(html)
                })
        
        return parsed_data

# Uso
scraper = AsyncWebScraper(max_concurrent=20)
urls = ["https://example.com", "https://python.org"]
# results = asyncio.run(scraper.scrape_and_parse(urls))

Cuándo usar cada enfoque:

  1. Secuencial: Pocas operaciones, prototipos, debugging
  2. Multiprocessing: CPU-intensive (cálculos matemáticos, machine learning)
  3. Multithreading: I/O-bound con librerías que liberan GIL (requests, file I/O)
  4. Async/Await: I/O-bound puro (muchas conexiones de red, WebSockets)

Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?

Comparación:

CaracterísticaTCPUDP
Confiabilidad✅ Garantizada❌ Best effort
Orden✅ Ordenado❌ Puede desordenarse
Overhead🔴 Alto🟢 Bajo
Velocidad🟡 Media✅ Alta
Uso de conexiónOrientado a conexiónSin conexión
Casos de usoHTTP, FTP, EmailDNS, Streaming, Gaming

Ejemplo: Chat en tiempo real

import socket
import threading
import json
from datetime import datetime

# TCP Chat Server (confiable, ordenado)
class TCPChatServer:
    def __init__(self, host='localhost', port=5000):
        self.host = host
        self.port = port
        self.clients = []
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    def start(self):
        """Iniciar servidor TCP"""
        self.socket.bind((self.host, self.port))
        self.socket.listen(5)
        print(f"🟢 TCP Server listening on {self.host}:{self.port}")
        
        while True:
            client_socket, address = self.socket.accept()
            print(f"✓ New connection from {address}")
            self.clients.append(client_socket)
            
            # Manejar cliente en thread separado
            thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket, address)
            )
            thread.start()
    
    def handle_client(self, client_socket, address):
        """Manejar mensajes de cliente"""
        while True:
            try:
                message = client_socket.recv(1024).decode('utf-8')
                if not message:
                    break
                
                print(f"📨 From {address}: {message}")
                
                # Broadcast a todos los clientes
                self.broadcast(message, client_socket)
            
            except Exception as e:
                print(f"❌ Error: {e}")
                break
        
        client_socket.close()
        self.clients.remove(client_socket)
        print(f"✗ {address} disconnected")
    
    def broadcast(self, message, sender_socket):
        """Enviar mensaje a todos excepto sender"""
        for client in self.clients:
            if client != sender_socket:
                try:
                    client.send(message.encode('utf-8'))
                except:
                    client.close()
                    self.clients.remove(client)

# UDP Game Server (rápido, puede perder paquetes)
class UDPGameServer:
    """Servidor UDP para posiciones de jugadores en tiempo real"""
    
    def __init__(self, host='localhost', port=5001):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((self.host, self.port))
        self.players = {}  # {address: player_data}
    
    def start(self):
        """Iniciar servidor UDP"""
        print(f"🟢 UDP Server listening on {self.host}:{self.port}")
        
        while True:
            try:
                data, address = self.socket.recvfrom(1024)
                message = json.loads(data.decode('utf-8'))
                
                # Actualizar posición del jugador
                self.players[address] = {
                    'position': message.get('position'),
                    'timestamp': datetime.now().isoformat()
                }
                
                # Enviar estado del juego a todos (broadcast)
                self.broadcast_game_state()
            
            except Exception as e:
                print(f"❌ Error: {e}")
    
    def broadcast_game_state(self):
        """Enviar posiciones de todos los jugadores"""
        game_state = {
            'players': [
                {
                    'address': str(addr),
                    'position': data['position']
                }
                for addr, data in self.players.items()
            ]
        }
        
        message = json.dumps(game_state).encode('utf-8')
        
        for address in self.players.keys():
            self.socket.sendto(message, address)

# Cliente TCP
def tcp_client(host='localhost', port=5000):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    
    while True:
        message = input("You: ")
        client.send(message.encode('utf-8'))
        
        response = client.recv(1024).decode('utf-8')
        print(f"Received: {response}")

# Cliente UDP (enviar posición cada frame)
def udp_game_client(host='localhost', port=5001):
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    position = [0, 0]  # x, y
    
    while True:
        # Simular movimiento
        position[0] += 1
        position[1] += 0.5
        
        message = json.dumps({'position': position})
        client.sendto(message.encode('utf-8'), (host, port))
        
        # Recibir estado del juego
        data, _ = client.recvfrom(4096)
        game_state = json.loads(data.decode('utf-8'))
        print(f"Players: {len(game_state['players'])}")
        
        time.sleep(0.016)  # ~60 FPS

Cuándo usar cada protocolo:

TCP:

UDP:


Receta 4.3: HTTP/HTTPS - Fundamentos Web

Métodos HTTP y casos de uso:

import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

# Base de datos simulada
users = {}

# GET - Obtener recursos (idempotente, cacheable)
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """Obtener usuario por ID"""
    user = users.get(user_id)
    if user:
        return jsonify(user), 200
    return jsonify({"error": "User not found"}), 404

# POST - Crear nuevo recurso (no idempotente)
@app.route('/users', methods=['POST'])
def create_user():
    """Crear nuevo usuario"""
    data = request.json
    user_id = len(users) + 1
    users[user_id] = {
        'id': user_id,
        'name': data['name'],
        'email': data['email']
    }
    return jsonify(users[user_id]), 201  # 201 Created

# PUT - Actualizar completamente (idempotente)
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    """Reemplazar usuario completamente"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    data = request.json
    users[user_id] = {
        'id': user_id,
        'name': data['name'],
        'email': data['email']
    }
    return jsonify(users[user_id]), 200

# PATCH - Actualizar parcialmente (idempotente)
@app.route('/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
    """Actualizar campos específicos"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    data = request.json
    users[user_id].update(data)  # Solo actualizar campos provistos
    return jsonify(users[user_id]), 200

# DELETE - Eliminar recurso (idempotente)
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    """Eliminar usuario"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    del users[user_id]
    return '', 204  # 204 No Content

# HEAD - Como GET pero solo headers (para verificar existencia)
@app.route('/users/<int:user_id>', methods=['HEAD'])
def head_user(user_id):
    """Verificar si usuario existe sin traer body"""
    if user_id in users:
        return '', 200
    return '', 404

# OPTIONS - Descubrir métodos soportados
@app.route('/users/<int:user_id>', methods=['OPTIONS'])
def options_user(user_id):
    """Retornar métodos HTTP permitidos"""
    return '', 200, {'Allow': 'GET, PUT, PATCH, DELETE, HEAD, OPTIONS'}

if __name__ == '__main__':
    app.run(debug=True)

Cliente HTTP con manejo de errores:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

class RobustHTTPClient:
    """Cliente HTTP con retry automático y timeout"""
    
    def __init__(self, base_url, timeout=10, max_retries=3):
        self.base_url = base_url
        self.timeout = timeout
        
        # Configurar retry strategy
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,  # Esperar 1s, 2s, 4s...
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session = requests.Session()
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def get(self, endpoint, **kwargs):
        """GET request con manejo de errores"""
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.get(url, timeout=self.timeout, **kwargs)
            response.raise_for_status()  # Lanzar excepción si 4xx/5xx
            return response.json()
        except requests.exceptions.Timeout:
            print(f"⏱️  Timeout al conectar a {url}")
            raise
        except requests.exceptions.ConnectionError:
            print(f"❌ Error de conexión a {url}")
            raise
        except requests.exceptions.HTTPError as e:
            print(f"❌ HTTP Error: {e.response.status_code}")
            raise
    
    def post(self, endpoint, data, **kwargs):
        """POST request"""
        url = f"{self.base_url}{endpoint}"
        response = self.session.post(
            url,
            json=data,
            timeout=self.timeout,
            **kwargs
        )
        response.raise_for_status()
        return response.json()

# Uso
client = RobustHTTPClient("https://api.example.com", timeout=5, max_retries=3)

try:
    users = client.get("/users")
    new_user = client.post("/users", {"name": "Alice", "email": "alice@example.com"})
except Exception as e:
    print(f"Failed: {e}")

Headers importantes:

# Configurar headers comunes
headers = {
    # Autenticación
    'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
    
    # Content negotiation
    'Content-Type': 'application/json',
    'Accept': 'application/json',
    
    # Caching
    'Cache-Control': 'max-age=3600',
    'ETag': '"33a64df551425fcc55e4d42a148795d9f25f89d4"',
    
    # CORS
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE',
    
    # Rate limiting
    'X-RateLimit-Limit': '100',
    'X-RateLimit-Remaining': '95',
    
    # Security
    'Strict-Transport-Security': 'max-age=31536000',
    'X-Content-Type-Options': 'nosniff',
}

Status codes esenciales:


¡Felicidades! 🎉

Has completado la FASE 1: Fundamentos Reforzados del roadmap del desarrollador del futuro.

Lo que has aprendido:

✅ Estructuras de datos y algoritmos fundamentales
✅ Patrones de diseño esenciales
✅ Principios de código limpio y refactorización
✅ Fundamentos de sistemas operativos y redes

Próximos pasos:

Continue con la FASE 2: Arquitectura y Diseño de Sistemas para aprender sobre:

Recursos adicionales recomendados:


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo