🏗️ FASE 2: Arquitectura y Diseño de Sistemas

Recetas prácticas para diseñar sistemas escalables y robustos


📚 Tabla de Contenidos

  1. Receta 2.8: REST vs GraphQL vs gRPC
  2. Receta 2.9: Resiliencia - Circuit Breakers y Retry

APIs y Comunicación entre Servicios

Receta 2.8: REST vs GraphQL vs gRPC

Comparación:

CaracterísticaRESTGraphQLgRPC
ProtocoloHTTP/JSONHTTP/JSONHTTP/2+Protobuf
QueriesMúltiples endpointsSingle endpointRPC calls
Over/Under-fetching❌ Común✅ Resuelve✅ Evita
Performance🟡 Media🟡 Media🟢 Alta
Tipado❌ Débil✅ Strong✅ Strong
Streaming❌ Difícil⚠️ Subscriptions✅ Bidireccional
Caching✅ HTTP cache❌ Complejo❌ Custom
Learning curve🟢 Fácil🟡 Media🔴 Alta

REST API completo:

from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# ===== AUTHENTICATION & AUTHORIZATION =====

def token_required(f):
    """Decorator para proteger endpoints"""
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': 'Token missing'}), 401
        
        try:
            # Bearer eyJhbGc...
            token = token.split(' ')[1]
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            current_user = data['user_id']
        except:
            return jsonify({'error': 'Token invalid'}), 401
        
        return f(current_user, *args, **kwargs)
    
    return decorated

@app.route('/auth/login', methods=['POST'])
def login():
    """Login y generar JWT"""
    data = request.json
    
    # Validar credenciales (simplificado)
    if data.get('email') == 'user@example.com' and data.get('password') == 'password':
        token = jwt.encode({
            'user_id': 'user_123',
            'exp': datetime.utcnow() + timedelta(hours=24)
        }, app.config['SECRET_KEY'])
        
        return jsonify({'token': token}), 200
    
    return jsonify({'error': 'Invalid credentials'}), 401

# ===== RESOURCE ENDPOINTS =====

# Simulated database
products = {
    '1': {'id': '1', 'name': 'Laptop', 'price': 999.99, 'stock': 10},
    '2': {'id': '2', 'name': 'Mouse', 'price': 29.99, 'stock': 50}
}

@app.route('/api/v1/products', methods=['GET'])
def get_products():
    """
    GET /api/v1/products?page=1&limit=10&sort=price&order=desc&category=electronics
    
    Query params:
    - page: pagination
    - limit: items per page
    - sort: field to sort by
    - order: asc or desc
    - filters: category, price_min, price_max, etc.
    """
    # Pagination
    page = int(request.args.get('page', 1))
    limit = int(request.args.get('limit', 10))
    
    # Filtering
    category = request.args.get('category')
    price_min = request.args.get('price_min', type=float)
    price_max = request.args.get('price_max', type=float)
    
    # Sorting
    sort_by = request.args.get('sort', 'id')
    order = request.args.get('order', 'asc')
    
    # Apply filters
    filtered_products = list(products.values())
    
    if price_min:
        filtered_products = [p for p in filtered_products if p['price'] >= price_min]
    if price_max:
        filtered_products = [p for p in filtered_products if p['price'] <= price_max]
    
    # Apply sorting
    filtered_products.sort(
        key=lambda x: x.get(sort_by),
        reverse=(order == 'desc')
    )
    
    # Pagination
    start = (page - 1) * limit
    end = start + limit
    paginated = filtered_products[start:end]
    
    # Response with metadata
    return jsonify({
        'data': paginated,
        'metadata': {
            'page': page,
            'limit': limit,
            'total': len(filtered_products),
            'total_pages': (len(filtered_products) + limit - 1) // limit
        },
        'links': {
            'self': f'/api/v1/products?page={page}&limit={limit}',
            'next': f'/api/v1/products?page={page+1}&limit={limit}' if end < len(filtered_products) else None,
            'prev': f'/api/v1/products?page={page-1}&limit={limit}' if page > 1 else None
        }
    }), 200

@app.route('/api/v1/products/<product_id>', methods=['GET'])
def get_product(product_id):
    """GET individual product"""
    product = products.get(product_id)
    
    if not product:
        return jsonify({'error': 'Product not found'}), 404
    
    return jsonify(product), 200

@app.route('/api/v1/products', methods=['POST'])
@token_required
def create_product(current_user):
    """
    POST /api/v1/products
    
    Body:
    {
        "name": "Keyboard",
        "price": 79.99,
        "stock": 30
    }
    """
    data = request.json
    
    # Validation
    if not data.get('name') or not data.get('price'):
        return jsonify({'error': 'Missing required fields'}), 400
    
    # Create product
    product_id = str(len(products) + 1)
    product = {
        'id': product_id,
        'name': data['name'],
        'price': data['price'],
        'stock': data.get('stock', 0)
    }
    products[product_id] = product
    
    # Return 201 Created with Location header
    return jsonify(product), 201, {'Location': f'/api/v1/products/{product_id}'}

@app.route('/api/v1/products/<product_id>', methods=['PUT'])
@token_required
def update_product(current_user, product_id):
    """PUT (replace entire resource)"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    data = request.json
    products[product_id] = {
        'id': product_id,
        'name': data['name'],
        'price': data['price'],
        'stock': data.get('stock', 0)
    }
    
    return jsonify(products[product_id]), 200

@app.route('/api/v1/products/<product_id>', methods=['PATCH'])
@token_required
def patch_product(current_user, product_id):
    """PATCH (partial update)"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    data = request.json
    product = products[product_id]
    
    # Update only provided fields
    if 'name' in data:
        product['name'] = data['name']
    if 'price' in data:
        product['price'] = data['price']
    if 'stock' in data:
        product['stock'] = data['stock']
    
    return jsonify(product), 200

@app.route('/api/v1/products/<product_id>', methods=['DELETE'])
@token_required
def delete_product(current_user, product_id):
    """DELETE product"""
    if product_id not in products:
        return jsonify({'error': 'Product not found'}), 404
    
    del products[product_id]
    return '', 204  # No content

# ===== ERROR HANDLING =====

@app.errorhandler(404)
def not_found(error):
    return jsonify({
        'error': 'Not Found',
        'message': 'The requested resource was not found'
    }), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({
        'error': 'Internal Server Error',
        'message': 'An unexpected error occurred'
    }), 500

# ===== RATE LIMITING =====

from functools import wraps
from flask import g
import time

def rate_limit(max_requests=10, window=60):
    """Simple in-memory rate limiter"""
    requests = {}
    
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            client_ip = request.remote_addr
            current_time = time.time()
            
            # Clean old requests
            if client_ip in requests:
                requests[client_ip] = [
                    t for t in requests[client_ip]
                    if current_time - t < window
                ]
            else:
                requests[client_ip] = []
            
            # Check limit
            if len(requests[client_ip]) >= max_requests:
                return jsonify({
                    'error': 'Rate limit exceeded',
                    'retry_after': window
                }), 429
            
            # Add request
            requests[client_ip].append(current_time)
            
            # Set headers
            response = f(*args, **kwargs)
            if isinstance(response, tuple):
                resp, status = response[0], response[1]
            else:
                resp, status = response, 200
            
            # Return with rate limit headers
            return resp, status, {
                'X-RateLimit-Limit': max_requests,
                'X-RateLimit-Remaining': max_requests - len(requests[client_ip]),
                'X-RateLimit-Reset': int(current_time + window)
            }
        
        return decorated
    return decorator

@app.route('/api/v1/limited', methods=['GET'])
@rate_limit(max_requests=5, window=60)
def limited_endpoint():
    return jsonify({'message': 'This endpoint is rate limited'}), 200

GraphQL API:

import graphene
from graphene import ObjectType, String, Float, Int, List, Field, Schema

# ===== TYPES =====

class ProductType(ObjectType):
    id = String()
    name = String()
    price = Float()
    stock = Int()

class OrderItemType(ObjectType):
    product = Field(ProductType)
    quantity = Int()
    subtotal = Float()

class OrderType(ObjectType):
    id = String()
    user_id = String()
    items = List(OrderItemType)
    total = Float()
    status = String()

# ===== QUERIES =====

class Query(ObjectType):
    # Single product
    product = Field(
        ProductType,
        id=String(required=True),
        description="Get product by ID"
    )
    
    # List products with filtering
    products = List(
        ProductType,
        category=String(),
        price_min=Float(),
        price_max=Float(),
        limit=Int(default_value=10)
    )
    
    # Order with nested products
    order = Field(
        OrderType,
        id=String(required=True)
    )
    
    def resolve_product(self, info, id):
        """Resolver para single product"""
        return products.get(id)
    
    def resolve_products(self, info, category=None, price_min=None, price_max=None, limit=10):
        """Resolver para list products"""
        result = list(products.values())
        
        if price_min:
            result = [p for p in result if p['price'] >= price_min]
        if price_max:
            result = [p for p in result if p['price'] <= price_max]
        
        return result[:limit]
    
    def resolve_order(self, info, id):
        """Resolver para order (with N+1 problem solution)"""
        # En producción: usar DataLoader para batching
        order = orders.get(id)
        if not order:
            return None
        
        # Cargar productos de los items
        enriched_items = []
        for item in order['items']:
            product = products.get(item['product_id'])
            enriched_items.append({
                'product': product,
                'quantity': item['quantity'],
                'subtotal': product['price'] * item['quantity']
            })
        
        return {
            **order,
            'items': enriched_items
        }

# ===== MUTATIONS =====

class CreateProduct(graphene.Mutation):
    class Arguments:
        name = String(required=True)
        price = Float(required=True)
        stock = Int(default_value=0)
    
    product = Field(ProductType)
    
    def mutate(self, info, name, price, stock=0):
        product_id = str(len(products) + 1)
        product = {
            'id': product_id,
            'name': name,
            'price': price,
            'stock': stock
        }
        products[product_id] = product
        return CreateProduct(product=product)

class UpdateProduct(graphene.Mutation):
    class Arguments:
        id = String(required=True)
        name = String()
        price = Float()
        stock = Int()
    
    product = Field(ProductType)
    
    def mutate(self, info, id, name=None, price=None, stock=None):
        if id not in products:
            raise Exception('Product not found')
        
        product = products[id]
        if name:
            product['name'] = name
        if price:
            product['price'] = price
        if stock is not None:
            product['stock'] = stock
        
        return UpdateProduct(product=product)

class Mutation(ObjectType):
    create_product = CreateProduct.Field()
    update_product = UpdateProduct.Field()

# ===== SCHEMA =====

schema = Schema(query=Query, mutation=Mutation)

# ===== FLASK INTEGRATION =====

from flask import Flask
from flask_graphql import GraphQLView

app = Flask(__name__)
app.add_url_rule(
    '/graphql',
    view_func=GraphQLView.as_view(
        'graphql',
        schema=schema,
        graphiql=True  # GraphiQL IDE
    )
)

# Ejemplo de query GraphQL:
"""
query GetProducts {
  products(priceMin: 20, priceMax: 100, limit: 5) {
    id
    name
    price
    stock
  }
}

query GetOrder {
  order(id: "1") {
    id
    total
    items {
      product {
        name
        price
      }
      quantity
      subtotal
    }
  }
}

mutation CreateProduct {
  createProduct(name: "Monitor", price: 299.99, stock: 15) {
    product {
      id
      name
      price
    }
  }
}
"""

gRPC con Protocol Buffers:

// products.proto
syntax = "proto3";

package products;

service ProductService {
  rpc GetProduct (ProductRequest) returns (Product);
  rpc ListProducts (ListProductsRequest) returns (ProductList);
  rpc CreateProduct (CreateProductRequest) returns (Product);
  rpc StreamOrders (StreamOrdersRequest) returns (stream Order);
}

message Product {
  string id = 1;
  string name = 2;
  double price = 3;
  int32 stock = 4;
}

message ProductRequest {
  string id = 1;
}

message ListProductsRequest {
  int32 page = 1;
  int32 limit = 2;
  double price_min = 3;
  double price_max = 4;
}

message ProductList {
  repeated Product products = 1;
  int32 total = 2;
}

message CreateProductRequest {
  string name = 1;
  double price = 2;
  int32 stock = 3;
}

message Order {
  string id = 1;
  string user_id = 2;
  double total = 3;
  string status = 4;
}

message StreamOrdersRequest {
  string user_id = 1;
}
# Generar código Python: python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. products.proto

import grpc
from concurrent import futures
import products_pb2
import products_pb2_grpc
import time

# ===== SERVER =====

class ProductServicer(products_pb2_grpc.ProductServiceServicer):
    """Implementación del servicio gRPC"""
    
    def GetProduct(self, request, context):
        """Unary RPC: 1 request → 1 response"""
        product_id = request.id
        
        if product_id not in products:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details('Product not found')
            return products_pb2.Product()
        
        p = products[product_id]
        return products_pb2.Product(
            id=p['id'],
            name=p['name'],
            price=p['price'],
            stock=p['stock']
        )
    
    def ListProducts(self, request, context):
        """Unary RPC con filtros"""
        filtered = list(products.values())
        
        if request.price_min > 0:
            filtered = [p for p in filtered if p['price'] >= request.price_min]
        if request.price_max > 0:
            filtered = [p for p in filtered if p['price'] <= request.price_max]
        
        # Pagination
        start = (request.page - 1) * request.limit
        end = start + request.limit
        paginated = filtered[start:end]
        
        product_messages = [
            products_pb2.Product(
                id=p['id'],
                name=p['name'],
                price=p['price'],
                stock=p['stock']
            )
            for p in paginated
        ]
        
        return products_pb2.ProductList(
            products=product_messages,
            total=len(filtered)
        )
    
    def CreateProduct(self, request, context):
        """Unary RPC para crear producto"""
        product_id = str(len(products) + 1)
        product = {
            'id': product_id,
            'name': request.name,
            'price': request.price,
            'stock': request.stock
        }
        products[product_id] = product
        
        return products_pb2.Product(
            id=product_id,
            name=request.name,
            price=request.price,
            stock=request.stock
        )
    
    def StreamOrders(self, request, context):
        """Server-streaming RPC: 1 request → N responses"""
        user_id = request.user_id
        
        # Simular stream de órdenes
        for i in range(5):
            time.sleep(1)  # Simular delay
            yield products_pb2.Order(
                id=f"order_{i}",
                user_id=user_id,
                total=99.99 * (i + 1),
                status='processing'
            )

def serve():
    """Iniciar servidor gRPC"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    products_pb2_grpc.add_ProductServiceServicer_to_server(
        ProductServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("gRPC server started on port 50051")
    server.wait_for_termination()

# ===== CLIENT =====

def run_client():
    """Cliente gRPC"""
    channel = grpc.insecure_channel('localhost:50051')
    stub = products_pb2_grpc.ProductServiceStub(channel)
    
    # GetProduct
    response = stub.GetProduct(products_pb2.ProductRequest(id='1'))
    print(f"Product: {response.name}, ${response.price}")
    
    # ListProducts
    response = stub.ListProducts(products_pb2.ListProductsRequest(
        page=1,
        limit=10,
        price_min=20.0,
        price_max=100.0
    ))
    print(f"Found {response.total} products:")
    for product in response.products:
        print(f"  - {product.name}: ${product.price}")
    
    # CreateProduct
    new_product = stub.CreateProduct(products_pb2.CreateProductRequest(
        name='Monitor',
        price=299.99,
        stock=15
    ))
    print(f"Created product: {new_product.id}")
    
    # StreamOrders (server streaming)
    print("\nStreaming orders:")
    for order in stub.StreamOrders(products_pb2.StreamOrdersRequest(user_id='user_123')):
        print(f"  Order {order.id}: ${order.total} - {order.status}")

if __name__ == '__main__':
    # Run server in background, then client
    import threading
    server_thread = threading.Thread(target=serve, daemon=True)
    server_thread.start()
    time.sleep(2)
    run_client()

Cuándo usar cada uno:

REST:

GraphQL:

gRPC:


Receta 2.9: Resiliencia - Circuit Breakers y Retry

Circuit Breaker Pattern:

from enum import Enum
from datetime import datetime, timedelta
import time
import random

class CircuitState(Enum):
    CLOSED = "closed"      # Normal: requests pasan
    OPEN = "open"          # Fallando: rechazar requests
    HALF_OPEN = "half_open"  # Testing: permitir algunos requests

class CircuitBreaker:
    """
    Circuit Breaker para proteger contra fallos en cascada
    
    Estados:
    - CLOSED: Funcionando normal
    - OPEN: Demasiados fallos, rechazar requests
    - HALF_OPEN: Intentar recuperación gradual
    """
    
    def __init__(
        self,
        failure_threshold=5,     # Fallos antes de abrir
        timeout=60,              # Segundos en OPEN
        success_threshold=2      # Éxitos para cerrar desde HALF_OPEN
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.success_threshold = success_threshold
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        """Ejecutar función con circuit breaker"""
        
        # Si está OPEN, verificar si es tiempo de intentar HALF_OPEN
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                print("🔄 Circuit breaker: OPEN → HALF_OPEN (attempting recovery)")
            else:
                raise Exception(f"Circuit breaker is OPEN (retry in {self._time_until_retry()}s)")
        
        try:
            # Intentar ejecutar función
            result = func(*args, **kwargs)
            self._on_success()
            return result
        
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """Manejar llamada exitosa"""
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            print(f"✓ Success in HALF_OPEN ({self.success_count}/{self.success_threshold})")
            
            if self.success_count >= self.success_threshold:
                self._reset()
        else:
            # En CLOSED, reset failure count
            self.failure_count = 0
    
    def _on_failure(self):
        """Manejar llamada fallida"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        print(f"✗ Failure ({self.failure_count}/{self.failure_threshold})")
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"⚠️  Circuit breaker: CLOSED → OPEN (too many failures)")
        
        # Si falla en HALF_OPEN, volver a OPEN
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.success_count = 0
            print("⚠️  Circuit breaker: HALF_OPEN → OPEN (failure during recovery)")
    
    def _reset(self):
        """Resetear circuit breaker"""
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        print("✓ Circuit breaker: HALF_OPEN → CLOSED (recovered)")
    
    def _should_attempt_reset(self):
        """Verificar si es tiempo de intentar recovery"""
        if not self.last_failure_time:
            return True
        
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.timeout
    
    def _time_until_retry(self):
        """Tiempo restante hasta intentar recovery"""
        if not self.last_failure_time:
            return 0
        
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return max(0, self.timeout - elapsed)

# ===== RETRY CON EXPONENTIAL BACKOFF =====

def retry_with_backoff(
    func,
    max_retries=3,
    initial_delay=1,
    max_delay=60,
    backoff_factor=2,
    exceptions=(Exception,)
):
    """
    Retry con exponential backoff y jitter
    
    Delays: 1s, 2s, 4s, 8s, ...
    Jitter: Agregar randomness para evitar thundering herd
    """
    delay = initial_delay
    
    for attempt in range(max_retries + 1):
        try:
            return func()
        
        except exceptions as e:
            if attempt == max_retries:
                print(f"❌ Failed after {max_retries + 1} attempts")
                raise
            
            # Calcular delay con exponential backoff + jitter
            delay = min(delay * backoff_factor, max_delay)
            jitter = random.uniform(0, delay * 0.1)  # 10% jitter
            sleep_time = delay + jitter
            
            print(f"⚠️  Attempt {attempt + 1} failed: {e}")
            print(f"   Retrying in {sleep_time:.2f}s...")
            time.sleep(sleep_time)

# ===== DEMOSTRACIÓN =====

def unreliable_service():
    """Simular servicio que falla aleatoriamente"""
    if random.random() < 0.7:  # 70% chance de fallo
        raise Exception("Service unavailable")
    return "Success!"

print("=== Circuit Breaker Demo ===\n")

cb = CircuitBreaker(
    failure_threshold=3,
    timeout=5,
    success_threshold=2
)

# Hacer múltiples llamadas
for i in range(20):
    try:
        result = cb.call(unreliable_service)
        print(f"✓ Request {i+1}: {result}\n")
    except Exception as e:
        print(f"✗ Request {i+1}: {e}\n")
    
    time.sleep(1)

print("\n=== Retry with Backoff Demo ===\n")

attempt_count = [0]

def flaky_service():
    """Servicio que falla las primeras 2 veces"""
    attempt_count[0] += 1
    if attempt_count[0] < 3:
        raise Exception(f"Temporary failure (attempt {attempt_count[0]})")
    return "Success after retries!"

try:
    result = retry_with_backoff(
        flaky_service,
        max_retries=5,
        initial_delay=0.5,
        backoff_factor=2
    )
    print(f"\n✓ Final result: {result}")
except Exception as e:
    print(f"\n❌ Final error: {e}")

Timeout y Fallback:

import signal
from contextlib import contextmanager

class TimeoutError(Exception):
    pass

@contextmanager
def timeout(seconds):
    """Context manager para timeout"""
    def timeout_handler(signum, frame):
        raise TimeoutError(f"Operation timed out after {seconds}s")
    
    # Set signal handler
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

def call_with_fallback(primary_func, fallback_func, timeout_seconds=5):
    """Llamar función con fallback si falla o timeout"""
    try:
        with timeout(timeout_seconds):
            return primary_func()
    except (Exception, TimeoutError) as e:
        print(f"⚠️  Primary failed: {e}, using fallback")
        return fallback_func()

# Uso
def slow_external_api():
    time.sleep(10)  # Muy lento
    return {"data": "from API"}

def cache_fallback():
    return {"data": "from cache (stale)"}

result = call_with_fallback(
    slow_external_api,
    cache_fallback,
    timeout_seconds=2
)
print(f"Result: {result}")

¡Felicidades! 🎉

Has completado la FASE 2: Arquitectura y Diseño de Sistemas del roadmap.

Lo que has aprendido:

✅ Monolitos vs Microservicios
✅ Event-Driven Architecture y CQRS
✅ CAP Theorem y consistencia eventual
✅ Load Balancing y caching strategies
✅ SQL avanzado y optimización
✅ NoSQL (MongoDB, Redis)
✅ REST vs GraphQL vs gRPC
✅ Resiliencia con Circuit Breakers

Próximos pasos:

FASE 3: Infraestructura y DevOps


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo