🏗️ FASE 2: Arquitectura y Diseño de Sistemas
Recetas prácticas para diseñar sistemas escalables y robustos
📚 Tabla de Contenidos
APIs y Comunicación entre Servicios
Receta 2.8: REST vs GraphQL vs gRPC
Comparación:
| Característica | REST | GraphQL | gRPC |
|---|---|---|---|
| Protocolo | HTTP/JSON | HTTP/JSON | HTTP/2+Protobuf |
| Queries | Múltiples endpoints | Single endpoint | RPC calls |
| Over/Under-fetching | ❌ Común | ✅ Resuelve | ✅ Evita |
| Performance | 🟡 Media | 🟡 Media | 🟢 Alta |
| Tipado | ❌ Débil | ✅ Strong | ✅ Strong |
| Streaming | ❌ Difícil | ⚠️ Subscriptions | ✅ Bidireccional |
| Caching | ✅ HTTP cache | ❌ Complejo | ❌ Custom |
| Learning curve | 🟢 Fácil | 🟡 Media | 🔴 Alta |
REST API completo:
from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# ===== AUTHENTICATION & AUTHORIZATION =====
def token_required(f):
"""Decorator para proteger endpoints"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token missing'}), 401
try:
# Bearer eyJhbGc...
token = token.split(' ')[1]
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['user_id']
except:
return jsonify({'error': 'Token invalid'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route('/auth/login', methods=['POST'])
def login():
"""Login y generar JWT"""
data = request.json
# Validar credenciales (simplificado)
if data.get('email') == 'user@example.com' and data.get('password') == 'password':
token = jwt.encode({
'user_id': 'user_123',
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['SECRET_KEY'])
return jsonify({'token': token}), 200
return jsonify({'error': 'Invalid credentials'}), 401
# ===== RESOURCE ENDPOINTS =====
# Simulated database
products = {
'1': {'id': '1', 'name': 'Laptop', 'price': 999.99, 'stock': 10},
'2': {'id': '2', 'name': 'Mouse', 'price': 29.99, 'stock': 50}
}
@app.route('/api/v1/products', methods=['GET'])
def get_products():
"""
GET /api/v1/products?page=1&limit=10&sort=price&order=desc&category=electronics
Query params:
- page: pagination
- limit: items per page
- sort: field to sort by
- order: asc or desc
- filters: category, price_min, price_max, etc.
"""
# Pagination
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
# Filtering
category = request.args.get('category')
price_min = request.args.get('price_min', type=float)
price_max = request.args.get('price_max', type=float)
# Sorting
sort_by = request.args.get('sort', 'id')
order = request.args.get('order', 'asc')
# Apply filters
filtered_products = list(products.values())
if price_min:
filtered_products = [p for p in filtered_products if p['price'] >= price_min]
if price_max:
filtered_products = [p for p in filtered_products if p['price'] <= price_max]
# Apply sorting
filtered_products.sort(
key=lambda x: x.get(sort_by),
reverse=(order == 'desc')
)
# Pagination
start = (page - 1) * limit
end = start + limit
paginated = filtered_products[start:end]
# Response with metadata
return jsonify({
'data': paginated,
'metadata': {
'page': page,
'limit': limit,
'total': len(filtered_products),
'total_pages': (len(filtered_products) + limit - 1) // limit
},
'links': {
'self': f'/api/v1/products?page={page}&limit={limit}',
'next': f'/api/v1/products?page={page+1}&limit={limit}' if end < len(filtered_products) else None,
'prev': f'/api/v1/products?page={page-1}&limit={limit}' if page > 1 else None
}
}), 200
@app.route('/api/v1/products/<product_id>', methods=['GET'])
def get_product(product_id):
"""GET individual product"""
product = products.get(product_id)
if not product:
return jsonify({'error': 'Product not found'}), 404
return jsonify(product), 200
@app.route('/api/v1/products', methods=['POST'])
@token_required
def create_product(current_user):
"""
POST /api/v1/products
Body:
{
"name": "Keyboard",
"price": 79.99,
"stock": 30
}
"""
data = request.json
# Validation
if not data.get('name') or not data.get('price'):
return jsonify({'error': 'Missing required fields'}), 400
# Create product
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': data['name'],
'price': data['price'],
'stock': data.get('stock', 0)
}
products[product_id] = product
# Return 201 Created with Location header
return jsonify(product), 201, {'Location': f'/api/v1/products/{product_id}'}
@app.route('/api/v1/products/<product_id>', methods=['PUT'])
@token_required
def update_product(current_user, product_id):
"""PUT (replace entire resource)"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
data = request.json
products[product_id] = {
'id': product_id,
'name': data['name'],
'price': data['price'],
'stock': data.get('stock', 0)
}
return jsonify(products[product_id]), 200
@app.route('/api/v1/products/<product_id>', methods=['PATCH'])
@token_required
def patch_product(current_user, product_id):
"""PATCH (partial update)"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
data = request.json
product = products[product_id]
# Update only provided fields
if 'name' in data:
product['name'] = data['name']
if 'price' in data:
product['price'] = data['price']
if 'stock' in data:
product['stock'] = data['stock']
return jsonify(product), 200
@app.route('/api/v1/products/<product_id>', methods=['DELETE'])
@token_required
def delete_product(current_user, product_id):
"""DELETE product"""
if product_id not in products:
return jsonify({'error': 'Product not found'}), 404
del products[product_id]
return '', 204 # No content
# ===== ERROR HANDLING =====
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Not Found',
'message': 'The requested resource was not found'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
}), 500
# ===== RATE LIMITING =====
from functools import wraps
from flask import g
import time
def rate_limit(max_requests=10, window=60):
"""Simple in-memory rate limiter"""
requests = {}
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
client_ip = request.remote_addr
current_time = time.time()
# Clean old requests
if client_ip in requests:
requests[client_ip] = [
t for t in requests[client_ip]
if current_time - t < window
]
else:
requests[client_ip] = []
# Check limit
if len(requests[client_ip]) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429
# Add request
requests[client_ip].append(current_time)
# Set headers
response = f(*args, **kwargs)
if isinstance(response, tuple):
resp, status = response[0], response[1]
else:
resp, status = response, 200
# Return with rate limit headers
return resp, status, {
'X-RateLimit-Limit': max_requests,
'X-RateLimit-Remaining': max_requests - len(requests[client_ip]),
'X-RateLimit-Reset': int(current_time + window)
}
return decorated
return decorator
@app.route('/api/v1/limited', methods=['GET'])
@rate_limit(max_requests=5, window=60)
def limited_endpoint():
return jsonify({'message': 'This endpoint is rate limited'}), 200
GraphQL API:
import graphene
from graphene import ObjectType, String, Float, Int, List, Field, Schema
# ===== TYPES =====
class ProductType(ObjectType):
id = String()
name = String()
price = Float()
stock = Int()
class OrderItemType(ObjectType):
product = Field(ProductType)
quantity = Int()
subtotal = Float()
class OrderType(ObjectType):
id = String()
user_id = String()
items = List(OrderItemType)
total = Float()
status = String()
# ===== QUERIES =====
class Query(ObjectType):
# Single product
product = Field(
ProductType,
id=String(required=True),
description="Get product by ID"
)
# List products with filtering
products = List(
ProductType,
category=String(),
price_min=Float(),
price_max=Float(),
limit=Int(default_value=10)
)
# Order with nested products
order = Field(
OrderType,
id=String(required=True)
)
def resolve_product(self, info, id):
"""Resolver para single product"""
return products.get(id)
def resolve_products(self, info, category=None, price_min=None, price_max=None, limit=10):
"""Resolver para list products"""
result = list(products.values())
if price_min:
result = [p for p in result if p['price'] >= price_min]
if price_max:
result = [p for p in result if p['price'] <= price_max]
return result[:limit]
def resolve_order(self, info, id):
"""Resolver para order (with N+1 problem solution)"""
# En producción: usar DataLoader para batching
order = orders.get(id)
if not order:
return None
# Cargar productos de los items
enriched_items = []
for item in order['items']:
product = products.get(item['product_id'])
enriched_items.append({
'product': product,
'quantity': item['quantity'],
'subtotal': product['price'] * item['quantity']
})
return {
**order,
'items': enriched_items
}
# ===== MUTATIONS =====
class CreateProduct(graphene.Mutation):
class Arguments:
name = String(required=True)
price = Float(required=True)
stock = Int(default_value=0)
product = Field(ProductType)
def mutate(self, info, name, price, stock=0):
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': name,
'price': price,
'stock': stock
}
products[product_id] = product
return CreateProduct(product=product)
class UpdateProduct(graphene.Mutation):
class Arguments:
id = String(required=True)
name = String()
price = Float()
stock = Int()
product = Field(ProductType)
def mutate(self, info, id, name=None, price=None, stock=None):
if id not in products:
raise Exception('Product not found')
product = products[id]
if name:
product['name'] = name
if price:
product['price'] = price
if stock is not None:
product['stock'] = stock
return UpdateProduct(product=product)
class Mutation(ObjectType):
create_product = CreateProduct.Field()
update_product = UpdateProduct.Field()
# ===== SCHEMA =====
schema = Schema(query=Query, mutation=Mutation)
# ===== FLASK INTEGRATION =====
from flask import Flask
from flask_graphql import GraphQLView
app = Flask(__name__)
app.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view(
'graphql',
schema=schema,
graphiql=True # GraphiQL IDE
)
)
# Ejemplo de query GraphQL:
"""
query GetProducts {
products(priceMin: 20, priceMax: 100, limit: 5) {
id
name
price
stock
}
}
query GetOrder {
order(id: "1") {
id
total
items {
product {
name
price
}
quantity
subtotal
}
}
}
mutation CreateProduct {
createProduct(name: "Monitor", price: 299.99, stock: 15) {
product {
id
name
price
}
}
}
"""
gRPC con Protocol Buffers:
// products.proto
syntax = "proto3";
package products;
service ProductService {
rpc GetProduct (ProductRequest) returns (Product);
rpc ListProducts (ListProductsRequest) returns (ProductList);
rpc CreateProduct (CreateProductRequest) returns (Product);
rpc StreamOrders (StreamOrdersRequest) returns (stream Order);
}
message Product {
string id = 1;
string name = 2;
double price = 3;
int32 stock = 4;
}
message ProductRequest {
string id = 1;
}
message ListProductsRequest {
int32 page = 1;
int32 limit = 2;
double price_min = 3;
double price_max = 4;
}
message ProductList {
repeated Product products = 1;
int32 total = 2;
}
message CreateProductRequest {
string name = 1;
double price = 2;
int32 stock = 3;
}
message Order {
string id = 1;
string user_id = 2;
double total = 3;
string status = 4;
}
message StreamOrdersRequest {
string user_id = 1;
}
# Generar código Python: python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. products.proto
import grpc
from concurrent import futures
import products_pb2
import products_pb2_grpc
import time
# ===== SERVER =====
class ProductServicer(products_pb2_grpc.ProductServiceServicer):
"""Implementación del servicio gRPC"""
def GetProduct(self, request, context):
"""Unary RPC: 1 request → 1 response"""
product_id = request.id
if product_id not in products:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details('Product not found')
return products_pb2.Product()
p = products[product_id]
return products_pb2.Product(
id=p['id'],
name=p['name'],
price=p['price'],
stock=p['stock']
)
def ListProducts(self, request, context):
"""Unary RPC con filtros"""
filtered = list(products.values())
if request.price_min > 0:
filtered = [p for p in filtered if p['price'] >= request.price_min]
if request.price_max > 0:
filtered = [p for p in filtered if p['price'] <= request.price_max]
# Pagination
start = (request.page - 1) * request.limit
end = start + request.limit
paginated = filtered[start:end]
product_messages = [
products_pb2.Product(
id=p['id'],
name=p['name'],
price=p['price'],
stock=p['stock']
)
for p in paginated
]
return products_pb2.ProductList(
products=product_messages,
total=len(filtered)
)
def CreateProduct(self, request, context):
"""Unary RPC para crear producto"""
product_id = str(len(products) + 1)
product = {
'id': product_id,
'name': request.name,
'price': request.price,
'stock': request.stock
}
products[product_id] = product
return products_pb2.Product(
id=product_id,
name=request.name,
price=request.price,
stock=request.stock
)
def StreamOrders(self, request, context):
"""Server-streaming RPC: 1 request → N responses"""
user_id = request.user_id
# Simular stream de órdenes
for i in range(5):
time.sleep(1) # Simular delay
yield products_pb2.Order(
id=f"order_{i}",
user_id=user_id,
total=99.99 * (i + 1),
status='processing'
)
def serve():
"""Iniciar servidor gRPC"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
products_pb2_grpc.add_ProductServiceServicer_to_server(
ProductServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started on port 50051")
server.wait_for_termination()
# ===== CLIENT =====
def run_client():
"""Cliente gRPC"""
channel = grpc.insecure_channel('localhost:50051')
stub = products_pb2_grpc.ProductServiceStub(channel)
# GetProduct
response = stub.GetProduct(products_pb2.ProductRequest(id='1'))
print(f"Product: {response.name}, ${response.price}")
# ListProducts
response = stub.ListProducts(products_pb2.ListProductsRequest(
page=1,
limit=10,
price_min=20.0,
price_max=100.0
))
print(f"Found {response.total} products:")
for product in response.products:
print(f" - {product.name}: ${product.price}")
# CreateProduct
new_product = stub.CreateProduct(products_pb2.CreateProductRequest(
name='Monitor',
price=299.99,
stock=15
))
print(f"Created product: {new_product.id}")
# StreamOrders (server streaming)
print("\nStreaming orders:")
for order in stub.StreamOrders(products_pb2.StreamOrdersRequest(user_id='user_123')):
print(f" Order {order.id}: ${order.total} - {order.status}")
if __name__ == '__main__':
# Run server in background, then client
import threading
server_thread = threading.Thread(target=serve, daemon=True)
server_thread.start()
time.sleep(2)
run_client()
Cuándo usar cada uno:
REST:
- ✅ APIs públicas
- ✅ CRUD simple
- ✅ Caching HTTP importante
- ✅ Team sin experiencia con GraphQL/gRPC
GraphQL:
- ✅ Frontend necesita flexibilidad en queries
- ✅ Múltiples clientes con necesidades diferentes
- ✅ Evitar over/under-fetching
- ❌ APIs públicas simples (REST es más simple)
gRPC:
- ✅ Microservices internos
- ✅ Performance crítico
- ✅ Streaming bidireccional
- ✅ Polyglot environments (multi-lenguaje)
- ❌ Browsers (limitado, necesita gRPC-Web)
Receta 2.9: Resiliencia - Circuit Breakers y Retry
Circuit Breaker Pattern:
from enum import Enum
from datetime import datetime, timedelta
import time
import random
class CircuitState(Enum):
CLOSED = "closed" # Normal: requests pasan
OPEN = "open" # Fallando: rechazar requests
HALF_OPEN = "half_open" # Testing: permitir algunos requests
class CircuitBreaker:
"""
Circuit Breaker para proteger contra fallos en cascada
Estados:
- CLOSED: Funcionando normal
- OPEN: Demasiados fallos, rechazar requests
- HALF_OPEN: Intentar recuperación gradual
"""
def __init__(
self,
failure_threshold=5, # Fallos antes de abrir
timeout=60, # Segundos en OPEN
success_threshold=2 # Éxitos para cerrar desde HALF_OPEN
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Ejecutar función con circuit breaker"""
# Si está OPEN, verificar si es tiempo de intentar HALF_OPEN
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("🔄 Circuit breaker: OPEN → HALF_OPEN (attempting recovery)")
else:
raise Exception(f"Circuit breaker is OPEN (retry in {self._time_until_retry()}s)")
try:
# Intentar ejecutar función
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Manejar llamada exitosa"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
print(f"✓ Success in HALF_OPEN ({self.success_count}/{self.success_threshold})")
if self.success_count >= self.success_threshold:
self._reset()
else:
# En CLOSED, reset failure count
self.failure_count = 0
def _on_failure(self):
"""Manejar llamada fallida"""
self.failure_count += 1
self.last_failure_time = datetime.now()
print(f"✗ Failure ({self.failure_count}/{self.failure_threshold})")
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"⚠️ Circuit breaker: CLOSED → OPEN (too many failures)")
# Si falla en HALF_OPEN, volver a OPEN
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.success_count = 0
print("⚠️ Circuit breaker: HALF_OPEN → OPEN (failure during recovery)")
def _reset(self):
"""Resetear circuit breaker"""
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
print("✓ Circuit breaker: HALF_OPEN → CLOSED (recovered)")
def _should_attempt_reset(self):
"""Verificar si es tiempo de intentar recovery"""
if not self.last_failure_time:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.timeout
def _time_until_retry(self):
"""Tiempo restante hasta intentar recovery"""
if not self.last_failure_time:
return 0
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return max(0, self.timeout - elapsed)
# ===== RETRY CON EXPONENTIAL BACKOFF =====
def retry_with_backoff(
func,
max_retries=3,
initial_delay=1,
max_delay=60,
backoff_factor=2,
exceptions=(Exception,)
):
"""
Retry con exponential backoff y jitter
Delays: 1s, 2s, 4s, 8s, ...
Jitter: Agregar randomness para evitar thundering herd
"""
delay = initial_delay
for attempt in range(max_retries + 1):
try:
return func()
except exceptions as e:
if attempt == max_retries:
print(f"❌ Failed after {max_retries + 1} attempts")
raise
# Calcular delay con exponential backoff + jitter
delay = min(delay * backoff_factor, max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% jitter
sleep_time = delay + jitter
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f" Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
# ===== DEMOSTRACIÓN =====
def unreliable_service():
"""Simular servicio que falla aleatoriamente"""
if random.random() < 0.7: # 70% chance de fallo
raise Exception("Service unavailable")
return "Success!"
print("=== Circuit Breaker Demo ===\n")
cb = CircuitBreaker(
failure_threshold=3,
timeout=5,
success_threshold=2
)
# Hacer múltiples llamadas
for i in range(20):
try:
result = cb.call(unreliable_service)
print(f"✓ Request {i+1}: {result}\n")
except Exception as e:
print(f"✗ Request {i+1}: {e}\n")
time.sleep(1)
print("\n=== Retry with Backoff Demo ===\n")
attempt_count = [0]
def flaky_service():
"""Servicio que falla las primeras 2 veces"""
attempt_count[0] += 1
if attempt_count[0] < 3:
raise Exception(f"Temporary failure (attempt {attempt_count[0]})")
return "Success after retries!"
try:
result = retry_with_backoff(
flaky_service,
max_retries=5,
initial_delay=0.5,
backoff_factor=2
)
print(f"\n✓ Final result: {result}")
except Exception as e:
print(f"\n❌ Final error: {e}")
Timeout y Fallback:
import signal
from contextlib import contextmanager
class TimeoutError(Exception):
pass
@contextmanager
def timeout(seconds):
"""Context manager para timeout"""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds}s")
# Set signal handler
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def call_with_fallback(primary_func, fallback_func, timeout_seconds=5):
"""Llamar función con fallback si falla o timeout"""
try:
with timeout(timeout_seconds):
return primary_func()
except (Exception, TimeoutError) as e:
print(f"⚠️ Primary failed: {e}, using fallback")
return fallback_func()
# Uso
def slow_external_api():
time.sleep(10) # Muy lento
return {"data": "from API"}
def cache_fallback():
return {"data": "from cache (stale)"}
result = call_with_fallback(
slow_external_api,
cache_fallback,
timeout_seconds=2
)
print(f"Result: {result}")
¡Felicidades! 🎉
Has completado la FASE 2: Arquitectura y Diseño de Sistemas del roadmap.
Lo que has aprendido:
✅ Monolitos vs Microservicios
✅ Event-Driven Architecture y CQRS
✅ CAP Theorem y consistencia eventual
✅ Load Balancing y caching strategies
✅ SQL avanzado y optimización
✅ NoSQL (MongoDB, Redis)
✅ REST vs GraphQL vs gRPC
✅ Resiliencia con Circuit Breakers
Próximos pasos:
FASE 3: Infraestructura y DevOps
- Contenedores y Kubernetes
- Cloud Computing
- CI/CD Pipelines
- Infraestructura como Código
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo