🧑‍💻 FASE 1: Fundamentos Reforzados - Sistemas Operativos y Redes

Recetas prácticas para dominar los fundamentos técnicos del desarrollo de software


📚 Tabla de Contenidos

  1. Receta 4.1: Procesos vs Hilos vs Async
  2. Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?
  3. Receta 4.3: HTTP/HTTPS - Fundamentos Web

Sistemas Operativos y Redes

Receta 4.1: Procesos vs Hilos vs Async

¿Cuál usar cuándo?

CaracterísticaProcesosHilosAsync/Await
Aislamiento✅ Total❌ Compartido❌ Compartido
Overhead🔴 Alto🟡 Medio🟢 Bajo
ComunicaciónIPC complejoMemoria compartidaEvent loop
Mejor paraCPU-intensiveI/O + CPUI/O-intensive
GIL (Python)✅ Evita❌ Afectado✅ Evita

Ejemplo práctico: Descargar múltiples archivos

import time
import asyncio
import aiohttp
from multiprocessing import Process, Pool
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Simular descarga
def download_file_sync(url, file_id):
    """Descarga sincrónica (bloqueante)"""
    print(f"Downloading {url}...")
    time.sleep(2)  # Simular I/O
    return f"Downloaded {file_id}"

# 1. SECUENCIAL (baseline)
def download_sequential(urls):
    start = time.time()
    results = [download_file_sync(url, i) for i, url in enumerate(urls)]
    print(f"⏱️  Sequential: {time.time() - start:.2f}s")
    return results

# 2. MULTIPROCESSING (CPU-bound)
def download_multiprocess(urls):
    start = time.time()
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(download_file_sync, urls, range(len(urls))))
    print(f"⏱️  Multiprocess: {time.time() - start:.2f}s")
    return results

# 3. MULTITHREADING (I/O-bound)
def download_multithread(urls):
    start = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(download_file_sync, urls, range(len(urls))))
    print(f"⏱️  Multithread: {time.time() - start:.2f}s")
    return results

# 4. ASYNC/AWAIT (I/O-bound óptimo)
async def download_file_async(session, url, file_id):
    """Descarga asíncrona (non-blocking)"""
    print(f"Downloading {url}...")
    await asyncio.sleep(2)  # Simular I/O async
    return f"Downloaded {file_id}"

async def download_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [download_file_async(session, url, i) for i, url in enumerate(urls)]
        results = await asyncio.gather(*tasks)
    print(f"⏱️  Async: {time.time() - start:.2f}s")
    return results

# Prueba
urls = [f"https://example.com/file{i}.zip" for i in range(10)]

download_sequential(urls)      # ~20s (10 archivos × 2s)
download_multiprocess(urls)    # ~4-6s (overhead de procesos)
download_multithread(urls)     # ~4s (paralelismo real)
asyncio.run(download_async(urls))  # ~2s (todo concurrente)

Caso de uso real: Web scraper

import asyncio
import aiohttp
from bs4 import BeautifulSoup

class AsyncWebScraper:
    """Scraper asíncrono eficiente"""
    
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_page(self, session, url):
        """Descargar una página"""
        async with self.semaphore:  # Limitar concurrencia
            try:
                async with session.get(url, timeout=10) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
    
    async def scrape_urls(self, urls):
        """Scrapear múltiples URLs concurrentemente"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    async def scrape_and_parse(self, urls):
        """Scrape + parseo"""
        pages = await self.scrape_urls(urls)
        
        parsed_data = []
        for url, html in zip(urls, pages):
            if html:
                soup = BeautifulSoup(html, 'html.parser')
                title = soup.find('title').text if soup.find('title') else "No title"
                parsed_data.append({
                    'url': url,
                    'title': title,
                    'length': len(html)
                })
        
        return parsed_data

# Uso
scraper = AsyncWebScraper(max_concurrent=20)
urls = ["https://example.com", "https://python.org"]
# results = asyncio.run(scraper.scrape_and_parse(urls))

Cuándo usar cada enfoque:

  1. Secuencial: Pocas operaciones, prototipos, debugging
  2. Multiprocessing: CPU-intensive (cálculos matemáticos, machine learning)
  3. Multithreading: I/O-bound con librerías que liberan GIL (requests, file I/O)
  4. Async/Await: I/O-bound puro (muchas conexiones de red, WebSockets)

Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?

Comparación:

CaracterísticaTCPUDP
Confiabilidad✅ Garantizada❌ Best effort
Orden✅ Ordenado❌ Puede desordenarse
Overhead🔴 Alto🟢 Bajo
Velocidad🟡 Media✅ Alta
Uso de conexiónOrientado a conexiónSin conexión
Casos de usoHTTP, FTP, EmailDNS, Streaming, Gaming

Ejemplo: Chat en tiempo real

import socket
import threading
import json
from datetime import datetime

# TCP Chat Server (confiable, ordenado)
class TCPChatServer:
    def __init__(self, host='localhost', port=5000):
        self.host = host
        self.port = port
        self.clients = []
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    def start(self):
        """Iniciar servidor TCP"""
        self.socket.bind((self.host, self.port))
        self.socket.listen(5)
        print(f"🟢 TCP Server listening on {self.host}:{self.port}")
        
        while True:
            client_socket, address = self.socket.accept()
            print(f"✓ New connection from {address}")
            self.clients.append(client_socket)
            
            # Manejar cliente en thread separado
            thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket, address)
            )
            thread.start()
    
    def handle_client(self, client_socket, address):
        """Manejar mensajes de cliente"""
        while True:
            try:
                message = client_socket.recv(1024).decode('utf-8')
                if not message:
                    break
                
                print(f"📨 From {address}: {message}")
                
                # Broadcast a todos los clientes
                self.broadcast(message, client_socket)
            
            except Exception as e:
                print(f"❌ Error: {e}")
                break
        
        client_socket.close()
        self.clients.remove(client_socket)
        print(f"✗ {address} disconnected")
    
    def broadcast(self, message, sender_socket):
        """Enviar mensaje a todos excepto sender"""
        for client in self.clients:
            if client != sender_socket:
                try:
                    client.send(message.encode('utf-8'))
                except:
                    client.close()
                    self.clients.remove(client)

# UDP Game Server (rápido, puede perder paquetes)
class UDPGameServer:
    """Servidor UDP para posiciones de jugadores en tiempo real"""
    
    def __init__(self, host='localhost', port=5001):
        self.host = host
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((self.host, self.port))
        self.players = {}  # {address: player_data}
    
    def start(self):
        """Iniciar servidor UDP"""
        print(f"🟢 UDP Server listening on {self.host}:{self.port}")
        
        while True:
            try:
                data, address = self.socket.recvfrom(1024)
                message = json.loads(data.decode('utf-8'))
                
                # Actualizar posición del jugador
                self.players[address] = {
                    'position': message.get('position'),
                    'timestamp': datetime.now().isoformat()
                }
                
                # Enviar estado del juego a todos (broadcast)
                self.broadcast_game_state()
            
            except Exception as e:
                print(f"❌ Error: {e}")
    
    def broadcast_game_state(self):
        """Enviar posiciones de todos los jugadores"""
        game_state = {
            'players': [
                {
                    'address': str(addr),
                    'position': data['position']
                }
                for addr, data in self.players.items()
            ]
        }
        
        message = json.dumps(game_state).encode('utf-8')
        
        for address in self.players.keys():
            self.socket.sendto(message, address)

# Cliente TCP
def tcp_client(host='localhost', port=5000):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    
    while True:
        message = input("You: ")
        client.send(message.encode('utf-8'))
        
        response = client.recv(1024).decode('utf-8')
        print(f"Received: {response}")

# Cliente UDP (enviar posición cada frame)
def udp_game_client(host='localhost', port=5001):
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    position = [0, 0]  # x, y
    
    while True:
        # Simular movimiento
        position[0] += 1
        position[1] += 0.5
        
        message = json.dumps({'position': position})
        client.sendto(message.encode('utf-8'), (host, port))
        
        # Recibir estado del juego
        data, _ = client.recvfrom(4096)
        game_state = json.loads(data.decode('utf-8'))
        print(f"Players: {len(game_state['players'])}")
        
        time.sleep(0.016)  # ~60 FPS

Cuándo usar cada protocolo:

TCP:

UDP:


Receta 4.3: HTTP/HTTPS - Fundamentos Web

Métodos HTTP y casos de uso:

import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

# Base de datos simulada
users = {}

# GET - Obtener recursos (idempotente, cacheable)
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """Obtener usuario por ID"""
    user = users.get(user_id)
    if user:
        return jsonify(user), 200
    return jsonify({"error": "User not found"}), 404

# POST - Crear nuevo recurso (no idempotente)
@app.route('/users', methods=['POST'])
def create_user():
    """Crear nuevo usuario"""
    data = request.json
    user_id = len(users) + 1
    users[user_id] = {
        'id': user_id,
        'name': data['name'],
        'email': data['email']
    }
    return jsonify(users[user_id]), 201  # 201 Created

# PUT - Actualizar completamente (idempotente)
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    """Reemplazar usuario completamente"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    data = request.json
    users[user_id] = {
        'id': user_id,
        'name': data['name'],
        'email': data['email']
    }
    return jsonify(users[user_id]), 200

# PATCH - Actualizar parcialmente (idempotente)
@app.route('/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
    """Actualizar campos específicos"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    data = request.json
    users[user_id].update(data)  # Solo actualizar campos provistos
    return jsonify(users[user_id]), 200

# DELETE - Eliminar recurso (idempotente)
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    """Eliminar usuario"""
    if user_id not in users:
        return jsonify({"error": "User not found"}), 404
    
    del users[user_id]
    return '', 204  # 204 No Content

# HEAD - Como GET pero solo headers (para verificar existencia)
@app.route('/users/<int:user_id>', methods=['HEAD'])
def head_user(user_id):
    """Verificar si usuario existe sin traer body"""
    if user_id in users:
        return '', 200
    return '', 404

# OPTIONS - Descubrir métodos soportados
@app.route('/users/<int:user_id>', methods=['OPTIONS'])
def options_user(user_id):
    """Retornar métodos HTTP permitidos"""
    return '', 200, {'Allow': 'GET, PUT, PATCH, DELETE, HEAD, OPTIONS'}

if __name__ == '__main__':
    app.run(debug=True)

Cliente HTTP con manejo de errores:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

class RobustHTTPClient:
    """Cliente HTTP con retry automático y timeout"""
    
    def __init__(self, base_url, timeout=10, max_retries=3):
        self.base_url = base_url
        self.timeout = timeout
        
        # Configurar retry strategy
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,  # Esperar 1s, 2s, 4s...
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session = requests.Session()
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def get(self, endpoint, **kwargs):
        """GET request con manejo de errores"""
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.get(url, timeout=self.timeout, **kwargs)
            response.raise_for_status()  # Lanzar excepción si 4xx/5xx
            return response.json()
        except requests.exceptions.Timeout:
            print(f"⏱️  Timeout al conectar a {url}")
            raise
        except requests.exceptions.ConnectionError:
            print(f"❌ Error de conexión a {url}")
            raise
        except requests.exceptions.HTTPError as e:
            print(f"❌ HTTP Error: {e.response.status_code}")
            raise
    
    def post(self, endpoint, data, **kwargs):
        """POST request"""
        url = f"{self.base_url}{endpoint}"
        response = self.session.post(
            url,
            json=data,
            timeout=self.timeout,
            **kwargs
        )
        response.raise_for_status()
        return response.json()

# Uso
client = RobustHTTPClient("https://api.example.com", timeout=5, max_retries=3)

try:
    users = client.get("/users")
    new_user = client.post("/users", {"name": "Alice", "email": "alice@example.com"})
except Exception as e:
    print(f"Failed: {e}")

Headers importantes:

# Configurar headers comunes
headers = {
    # Autenticación
    'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
    
    # Content negotiation
    'Content-Type': 'application/json',
    'Accept': 'application/json',
    
    # Caching
    'Cache-Control': 'max-age=3600',
    'ETag': '"33a64df551425fcc55e4d42a148795d9f25f89d4"',
    
    # CORS
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE',
    
    # Rate limiting
    'X-RateLimit-Limit': '100',
    'X-RateLimit-Remaining': '95',
    
    # Security
    'Strict-Transport-Security': 'max-age=31536000',
    'X-Content-Type-Options': 'nosniff',
}

Status codes esenciales:


¡Felicidades! 🎉

Has completado la FASE 1: Fundamentos Reforzados del roadmap del desarrollador del futuro.

Lo que has aprendido:

✅ Estructuras de datos y algoritmos fundamentales
✅ Patrones de diseño esenciales
✅ Principios de código limpio y refactorización
✅ Fundamentos de sistemas operativos y redes

Próximos pasos:

Continue con la FASE 2: Arquitectura y Diseño de Sistemas para aprender sobre:

Recursos adicionales recomendados:


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo