🧑💻 FASE 1: Fundamentos Reforzados - Sistemas Operativos y Redes
Recetas prácticas para dominar los fundamentos técnicos del desarrollo de software
📚 Tabla de Contenidos
- Receta 4.1: Procesos vs Hilos vs Async
- Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?
- Receta 4.3: HTTP/HTTPS - Fundamentos Web
Sistemas Operativos y Redes
Receta 4.1: Procesos vs Hilos vs Async
¿Cuál usar cuándo?
| Característica | Procesos | Hilos | Async/Await |
|---|---|---|---|
| Aislamiento | ✅ Total | ❌ Compartido | ❌ Compartido |
| Overhead | 🔴 Alto | 🟡 Medio | 🟢 Bajo |
| Comunicación | IPC complejo | Memoria compartida | Event loop |
| Mejor para | CPU-intensive | I/O + CPU | I/O-intensive |
| GIL (Python) | ✅ Evita | ❌ Afectado | ✅ Evita |
Ejemplo práctico: Descargar múltiples archivos
import time
import asyncio
import aiohttp
from multiprocessing import Process, Pool
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Simular descarga
def download_file_sync(url, file_id):
"""Descarga sincrónica (bloqueante)"""
print(f"Downloading {url}...")
time.sleep(2) # Simular I/O
return f"Downloaded {file_id}"
# 1. SECUENCIAL (baseline)
def download_sequential(urls):
start = time.time()
results = [download_file_sync(url, i) for i, url in enumerate(urls)]
print(f"⏱️ Sequential: {time.time() - start:.2f}s")
return results
# 2. MULTIPROCESSING (CPU-bound)
def download_multiprocess(urls):
start = time.time()
with ProcessPoolExecutor() as executor:
results = list(executor.map(download_file_sync, urls, range(len(urls))))
print(f"⏱️ Multiprocess: {time.time() - start:.2f}s")
return results
# 3. MULTITHREADING (I/O-bound)
def download_multithread(urls):
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(download_file_sync, urls, range(len(urls))))
print(f"⏱️ Multithread: {time.time() - start:.2f}s")
return results
# 4. ASYNC/AWAIT (I/O-bound óptimo)
async def download_file_async(session, url, file_id):
"""Descarga asíncrona (non-blocking)"""
print(f"Downloading {url}...")
await asyncio.sleep(2) # Simular I/O async
return f"Downloaded {file_id}"
async def download_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [download_file_async(session, url, i) for i, url in enumerate(urls)]
results = await asyncio.gather(*tasks)
print(f"⏱️ Async: {time.time() - start:.2f}s")
return results
# Prueba
urls = [f"https://example.com/file{i}.zip" for i in range(10)]
download_sequential(urls) # ~20s (10 archivos × 2s)
download_multiprocess(urls) # ~4-6s (overhead de procesos)
download_multithread(urls) # ~4s (paralelismo real)
asyncio.run(download_async(urls)) # ~2s (todo concurrente) Caso de uso real: Web scraper
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
"""Scraper asíncrono eficiente"""
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url):
"""Descargar una página"""
async with self.semaphore: # Limitar concurrencia
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
"""Scrapear múltiples URLs concurrentemente"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def scrape_and_parse(self, urls):
"""Scrape + parseo"""
pages = await self.scrape_urls(urls)
parsed_data = []
for url, html in zip(urls, pages):
if html:
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title').text if soup.find('title') else "No title"
parsed_data.append({
'url': url,
'title': title,
'length': len(html)
})
return parsed_data
# Uso
scraper = AsyncWebScraper(max_concurrent=20)
urls = ["https://example.com", "https://python.org"]
# results = asyncio.run(scraper.scrape_and_parse(urls)) Cuándo usar cada enfoque:
- Secuencial: Pocas operaciones, prototipos, debugging
- Multiprocessing: CPU-intensive (cálculos matemáticos, machine learning)
- Multithreading: I/O-bound con librerías que liberan GIL (requests, file I/O)
- Async/Await: I/O-bound puro (muchas conexiones de red, WebSockets)
Receta 4.2: TCP vs UDP - ¿Cuándo usar cada uno?
Comparación:
| Característica | TCP | UDP |
|---|---|---|
| Confiabilidad | ✅ Garantizada | ❌ Best effort |
| Orden | ✅ Ordenado | ❌ Puede desordenarse |
| Overhead | 🔴 Alto | 🟢 Bajo |
| Velocidad | 🟡 Media | ✅ Alta |
| Uso de conexión | Orientado a conexión | Sin conexión |
| Casos de uso | HTTP, FTP, Email | DNS, Streaming, Gaming |
Ejemplo: Chat en tiempo real
import socket
import threading
import json
from datetime import datetime
# TCP Chat Server (confiable, ordenado)
class TCPChatServer:
def __init__(self, host='localhost', port=5000):
self.host = host
self.port = port
self.clients = []
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def start(self):
"""Iniciar servidor TCP"""
self.socket.bind((self.host, self.port))
self.socket.listen(5)
print(f"🟢 TCP Server listening on {self.host}:{self.port}")
while True:
client_socket, address = self.socket.accept()
print(f"✓ New connection from {address}")
self.clients.append(client_socket)
# Manejar cliente en thread separado
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
thread.start()
def handle_client(self, client_socket, address):
"""Manejar mensajes de cliente"""
while True:
try:
message = client_socket.recv(1024).decode('utf-8')
if not message:
break
print(f"📨 From {address}: {message}")
# Broadcast a todos los clientes
self.broadcast(message, client_socket)
except Exception as e:
print(f"❌ Error: {e}")
break
client_socket.close()
self.clients.remove(client_socket)
print(f"✗ {address} disconnected")
def broadcast(self, message, sender_socket):
"""Enviar mensaje a todos excepto sender"""
for client in self.clients:
if client != sender_socket:
try:
client.send(message.encode('utf-8'))
except:
client.close()
self.clients.remove(client)
# UDP Game Server (rápido, puede perder paquetes)
class UDPGameServer:
"""Servidor UDP para posiciones de jugadores en tiempo real"""
def __init__(self, host='localhost', port=5001):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((self.host, self.port))
self.players = {} # {address: player_data}
def start(self):
"""Iniciar servidor UDP"""
print(f"🟢 UDP Server listening on {self.host}:{self.port}")
while True:
try:
data, address = self.socket.recvfrom(1024)
message = json.loads(data.decode('utf-8'))
# Actualizar posición del jugador
self.players[address] = {
'position': message.get('position'),
'timestamp': datetime.now().isoformat()
}
# Enviar estado del juego a todos (broadcast)
self.broadcast_game_state()
except Exception as e:
print(f"❌ Error: {e}")
def broadcast_game_state(self):
"""Enviar posiciones de todos los jugadores"""
game_state = {
'players': [
{
'address': str(addr),
'position': data['position']
}
for addr, data in self.players.items()
]
}
message = json.dumps(game_state).encode('utf-8')
for address in self.players.keys():
self.socket.sendto(message, address)
# Cliente TCP
def tcp_client(host='localhost', port=5000):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
while True:
message = input("You: ")
client.send(message.encode('utf-8'))
response = client.recv(1024).decode('utf-8')
print(f"Received: {response}")
# Cliente UDP (enviar posición cada frame)
def udp_game_client(host='localhost', port=5001):
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
position = [0, 0] # x, y
while True:
# Simular movimiento
position[0] += 1
position[1] += 0.5
message = json.dumps({'position': position})
client.sendto(message.encode('utf-8'), (host, port))
# Recibir estado del juego
data, _ = client.recvfrom(4096)
game_state = json.loads(data.decode('utf-8'))
print(f"Players: {len(game_state['players'])}")
time.sleep(0.016) # ~60 FPS Cuándo usar cada protocolo:
TCP:
- APIs REST (HTTP/HTTPS)
- Transferencia de archivos
- Emails (SMTP, IMAP)
- Bases de datos
- Chat donde cada mensaje importa
UDP:
- Video/Audio streaming
- Gaming online (posiciones de jugadores)
- DNS queries
- VoIP
- Broadcasting/Multicasting
- Telemetría IoT (cuando pérdida ocasional es aceptable)
Receta 4.3: HTTP/HTTPS - Fundamentos Web
Métodos HTTP y casos de uso:
import requests
from flask import Flask, request, jsonify
app = Flask(__name__)
# Base de datos simulada
users = {}
# GET - Obtener recursos (idempotente, cacheable)
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""Obtener usuario por ID"""
user = users.get(user_id)
if user:
return jsonify(user), 200
return jsonify({"error": "User not found"}), 404
# POST - Crear nuevo recurso (no idempotente)
@app.route('/users', methods=['POST'])
def create_user():
"""Crear nuevo usuario"""
data = request.json
user_id = len(users) + 1
users[user_id] = {
'id': user_id,
'name': data['name'],
'email': data['email']
}
return jsonify(users[user_id]), 201 # 201 Created
# PUT - Actualizar completamente (idempotente)
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""Reemplazar usuario completamente"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
data = request.json
users[user_id] = {
'id': user_id,
'name': data['name'],
'email': data['email']
}
return jsonify(users[user_id]), 200
# PATCH - Actualizar parcialmente (idempotente)
@app.route('/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
"""Actualizar campos específicos"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
data = request.json
users[user_id].update(data) # Solo actualizar campos provistos
return jsonify(users[user_id]), 200
# DELETE - Eliminar recurso (idempotente)
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""Eliminar usuario"""
if user_id not in users:
return jsonify({"error": "User not found"}), 404
del users[user_id]
return '', 204 # 204 No Content
# HEAD - Como GET pero solo headers (para verificar existencia)
@app.route('/users/<int:user_id>', methods=['HEAD'])
def head_user(user_id):
"""Verificar si usuario existe sin traer body"""
if user_id in users:
return '', 200
return '', 404
# OPTIONS - Descubrir métodos soportados
@app.route('/users/<int:user_id>', methods=['OPTIONS'])
def options_user(user_id):
"""Retornar métodos HTTP permitidos"""
return '', 200, {'Allow': 'GET, PUT, PATCH, DELETE, HEAD, OPTIONS'}
if __name__ == '__main__':
app.run(debug=True) Cliente HTTP con manejo de errores:
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class RobustHTTPClient:
"""Cliente HTTP con retry automático y timeout"""
def __init__(self, base_url, timeout=10, max_retries=3):
self.base_url = base_url
self.timeout = timeout
# Configurar retry strategy
retry_strategy = Retry(
total=max_retries,
backoff_factor=1, # Esperar 1s, 2s, 4s...
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session = requests.Session()
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, endpoint, **kwargs):
"""GET request con manejo de errores"""
url = f"{self.base_url}{endpoint}"
try:
response = self.session.get(url, timeout=self.timeout, **kwargs)
response.raise_for_status() # Lanzar excepción si 4xx/5xx
return response.json()
except requests.exceptions.Timeout:
print(f"⏱️ Timeout al conectar a {url}")
raise
except requests.exceptions.ConnectionError:
print(f"❌ Error de conexión a {url}")
raise
except requests.exceptions.HTTPError as e:
print(f"❌ HTTP Error: {e.response.status_code}")
raise
def post(self, endpoint, data, **kwargs):
"""POST request"""
url = f"{self.base_url}{endpoint}"
response = self.session.post(
url,
json=data,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
return response.json()
# Uso
client = RobustHTTPClient("https://api.example.com", timeout=5, max_retries=3)
try:
users = client.get("/users")
new_user = client.post("/users", {"name": "Alice", "email": "alice@example.com"})
except Exception as e:
print(f"Failed: {e}") Headers importantes:
# Configurar headers comunes
headers = {
# Autenticación
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
# Content negotiation
'Content-Type': 'application/json',
'Accept': 'application/json',
# Caching
'Cache-Control': 'max-age=3600',
'ETag': '"33a64df551425fcc55e4d42a148795d9f25f89d4"',
# CORS
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE',
# Rate limiting
'X-RateLimit-Limit': '100',
'X-RateLimit-Remaining': '95',
# Security
'Strict-Transport-Security': 'max-age=31536000',
'X-Content-Type-Options': 'nosniff',
} Status codes esenciales:
- 2xx Éxito: 200 OK, 201 Created, 204 No Content
- 3xx Redirección: 301 Moved Permanently, 302 Found, 304 Not Modified
- 4xx Client Error: 400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found, 429 Too Many Requests
- 5xx Server Error: 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable
¡Felicidades! 🎉
Has completado la FASE 1: Fundamentos Reforzados del roadmap del desarrollador del futuro.
Lo que has aprendido:
✅ Estructuras de datos y algoritmos fundamentales
✅ Patrones de diseño esenciales
✅ Principios de código limpio y refactorización
✅ Fundamentos de sistemas operativos y redes
Próximos pasos:
Continue con la FASE 2: Arquitectura y Diseño de Sistemas para aprender sobre:
- Diseño de software escalable
- Bases de datos avanzadas
- APIs y comunicación entre servicios
Recursos adicionales recomendados:
- 📖 “Clean Code” - Robert C. Martin
- 📖 “Design Patterns” - Gang of Four
- 📖 “Introduction to Algorithms” - CLRS
- 🎥 MIT OpenCourseWare - Algorithms
- 💻 LeetCode / HackerRank para práctica
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo