🔒 Developer Cookbook - FASE 4: Observabilidad y Monitoreo

Recetas prácticas para construir aplicaciones seguras, monitoreadas y observables


📚 Tabla de Contenidos

  1. Receta 4.9: Los 3 Pilares - Logs, Métricas, Traces
  2. Receta 4.10: Grafana Dashboards
  3. Receta 4.11: SLIs, SLOs, SLAs y Error Budgets
  4. Receta 4.12: Incident Response

Observabilidad y Monitoreo

Receta 4.9: Los 3 Pilares - Logs, Métricas, Traces

1. LOGS - Structured Logging:

import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    """Formatter personalizado para logs estructurados"""

    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)

        # Agregar campos custom
        log_record['timestamp'] = datetime.utcnow().isoformat()
        log_record['level'] = record.levelname
        log_record['logger'] = record.name

        # Agregar contexto si existe
        if hasattr(record, 'user_id'):
            log_record['user_id'] = record.user_id
        if hasattr(record, 'request_id'):
            log_record['request_id'] = record.request_id

# Configurar logger
def setup_logging():
    """Configurar logging estructurado"""
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # Handler para stdout (Docker/K8s lo captura)
    handler = logging.StreamHandler()

    # Formato JSON
    formatter = CustomJsonFormatter(
        '%(timestamp)s %(level)s %(name)s %(message)s'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

logger = setup_logging()

# Uso con contexto
class RequestContext:
    """Contexto de request para logging"""

    def __init__(self):
        self.request_id = None
        self.user_id = None

    def set_request_id(self, request_id):
        self.request_id = request_id

    def set_user_id(self, user_id):
        self.user_id = user_id

context = RequestContext()

@app.before_request
def before_request():
    """Inicializar contexto de request"""
    import uuid
    context.set_request_id(str(uuid.uuid4()))

    # Extraer user_id de JWT
    token = request.headers.get('Authorization')
    if token:
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
            context.set_user_id(payload['user_id'])
        except:
            pass

# Logging con contexto
@app.route('/api/orders', methods=['POST'])
def create_order():
    logger.info(
        'Creating order',
        extra={
            'request_id': context.request_id,
            'user_id': context.user_id,
            'endpoint': '/api/orders',
            'method': 'POST'
        }
    )

    try:
        order = process_order(request.json)

        logger.info(
            'Order created successfully',
            extra={
                'request_id': context.request_id,
                'user_id': context.user_id,
                'order_id': order['id'],
                'total': order['total']
            }
        )

        return jsonify(order), 201

    except Exception as e:
        logger.error(
            'Failed to create order',
            extra={
                'request_id': context.request_id,
                'user_id': context.user_id,
                'error': str(e),
                'error_type': type(e).__name__
            },
            exc_info=True  # Include stack trace
        )

        return jsonify({'error': 'Internal error'}), 500

# Log levels apropiados
logger.debug('Detailed info for debugging')  # Solo en dev
logger.info('Normal operation')              # Eventos importantes
logger.warning('Something unexpected')       # Potencial problema
logger.error('Error occurred')               # Error que afecta funcionalidad
logger.critical('System unstable')           # Requiere atención inmediata

2. MÉTRICAS - Prometheus:

from prometheus_client import Counter, Histogram, Gauge, Summary, generate_latest
from prometheus_client import REGISTRY
from flask import Response
import time

# Definir métricas
http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration_seconds = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

active_users = Gauge(
    'active_users',
    'Number of active users'
)

order_value = Summary(
    'order_value_dollars',
    'Order value in dollars'
)

# Middleware para instrumentar requests
@app.before_request
def before_request_metrics():
    request.start_time = time.time()

@app.after_request
def after_request_metrics(response):
    # Incrementar contador
    http_requests_total.labels(
        method=request.method,
        endpoint=request.endpoint or 'unknown',
        status=response.status_code
    ).inc()

    # Registrar duración
    if hasattr(request, 'start_time'):
        duration = time.time() - request.start_time
        http_request_duration_seconds.labels(
            method=request.method,
            endpoint=request.endpoint or 'unknown'
        ).observe(duration)

    return response

# Endpoint de métricas
@app.route('/metrics')
def metrics():
    """Endpoint para Prometheus scraping"""
    return Response(generate_latest(REGISTRY), mimetype='text/plain')

# Métricas de negocio
@app.route('/api/orders', methods=['POST'])
def create_order():
    order = process_order(request.json)

    # Registrar valor de orden
    order_value.observe(order['total'])

    return jsonify(order)

# Métricas custom
class BusinessMetrics:
    """Métricas de negocio"""

    def __init__(self):
        self.revenue = Counter(
            'revenue_total_dollars',
            'Total revenue',
            ['product_category']
        )

        self.cart_size = Histogram(
            'cart_size_items',
            'Items per cart',
            buckets=[1, 5, 10, 20, 50]
        )

        self.checkout_duration = Histogram(
            'checkout_duration_seconds',
            'Time to complete checkout',
            buckets=[1, 5, 10, 30, 60, 120]
        )

    def record_sale(self, category: str, amount: float):
        """Registrar venta"""
        self.revenue.labels(product_category=category).inc(amount)

    def record_cart(self, items: int):
        """Registrar tamaño de carrito"""
        self.cart_size.observe(items)

    def record_checkout(self, duration: float):
        """Registrar tiempo de checkout"""
        self.checkout_duration.observe(duration)

business_metrics = BusinessMetrics()

Prometheus Configuration:

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  # API metrics
  - job_name: 'myapp-api'
    static_configs:
      - targets: ['api:3000']
    metrics_path: '/metrics'

  # Kubernetes pods
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__

# Alert rules
rule_files:
  - 'alerts.yml'

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

Alert Rules:

# alerts.yml
groups:
  - name: api_alerts
    interval: 30s
    rules:
      # Alta tasa de errores
      - alert: HighErrorRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
          ) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"

      # Latencia alta
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95,
            sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
          ) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "P95 latency is {{ $value }}s"

      # Pocos recursos
      - alert: HighMemoryUsage
        expr: |
          (
            node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes
          ) / node_memory_MemTotal_bytes > 0.9
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage"
          description: "Memory usage is {{ $value | humanizePercentage }}"

3. TRACES - OpenTelemetry:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

# Configurar tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Configurar exporter (Jaeger)
jaeger_exporter = JaegerExporter(
    agent_host_name='jaeger',
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Auto-instrumentar Flask
FlaskInstrumentor().instrument_app(app)

# Auto-instrumentar requests HTTP
RequestsInstrumentor().instrument()

# Auto-instrumentar SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=db.engine)

# Traces manuales
@app.route('/api/orders/<order_id>')
def get_order(order_id):
    with tracer.start_as_current_span('get_order') as span:
        # Agregar atributos al span
        span.set_attribute('order.id', order_id)
        span.set_attribute('user.id', g.user['id'])

        # Buscar orden
        with tracer.start_as_current_span('database.query'):
            order = db.orders.find_one({'_id': order_id})

        if not order:
            span.set_attribute('order.found', False)
            return jsonify({'error': 'Not found'}), 404

        span.set_attribute('order.found', True)
        span.set_attribute('order.total', order['total'])

        # Enrichir con datos de usuario
        with tracer.start_as_current_span('enrich_order_data'):
            enriched_order = enrich_order(order)

        return jsonify(enriched_order)

def enrich_order(order):
    """Enrichir orden con datos adicionales"""
    with tracer.start_as_current_span('fetch_user_data') as span:
        span.set_attribute('user.id', order['user_id'])
        user = fetch_user(order['user_id'])

    with tracer.start_as_current_span('fetch_products'):
        products = fetch_products([item['product_id'] for item in order['items']])

    return {
        **order,
        'user': user,
        'products': products
    }

# Distributed tracing context propagation
import requests

def call_external_service(data):
    """Llamar servicio externo con trace context"""
    with tracer.start_as_current_span('external_api_call') as span:
        # El context se propaga automáticamente en headers
        response = requests.post(
            'https://external-service.com/api',
            json=data
        )

        span.set_attribute('http.status_code', response.status_code)

        return response.json()

Receta 4.10: Grafana Dashboards

Dashboard Configuration:

{
  "dashboard": {
    "title": "API Performance Dashboard",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "sum(rate(http_requests_total[5m])) by (status)"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m]))"
          }
        ],
        "type": "stat",
        "fieldConfig": {
          "defaults": {
            "unit": "percentunit",
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"value": 0, "color": "green"},
                {"value": 0.01, "color": "yellow"},
                {"value": 0.05, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "title": "Latency (P95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint))"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Active Users",
        "targets": [
          {
            "expr": "active_users"
          }
        ],
        "type": "stat"
      }
    ]
  }
}

Loki - Log Aggregation:

# promtail-config.yml (Log shipper)
server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  # Logs de contenedores Docker
  - job_name: docker
    static_configs:
      - targets:
          - localhost
        labels:
          job: docker
          __path__: /var/lib/docker/containers/*/*.log

    pipeline_stages:
      # Parsear JSON
      - json:
          expressions:
            level: level
            message: message
            timestamp: timestamp

      # Extraer labels
      - labels:
          level:

      # Formatear timestamp
      - timestamp:
          source: timestamp
          format: RFC3339

LogQL Queries (Loki Query Language):

# Todos los logs de API
{job="api"}

# Logs de error
{job="api"} |= "ERROR"

# Logs de usuario específico
{job="api"} | json | user_id="12345"

# Conteo de errores por endpoint
sum by (endpoint) (
  count_over_time({job="api"} |= "ERROR" [5m])
)

# Latencia promedio
avg_over_time({job="api"} | json | unwrap duration [5m])

# Top 10 endpoints más lentos
topk(10,
  avg_over_time({job="api"} | json | unwrap duration [5m]) by (endpoint)
)

Receta 4.11: SLIs, SLOs, SLAs y Error Budgets

Definiciones:

Ejemplo de SLIs:

from dataclasses import dataclass
from typing import List
from datetime import datetime, timedelta

@dataclass
class SLI:
    """Service Level Indicator"""
    name: str
    description: str
    query: str  # Prometheus query
    good_threshold: float  # Lo que se considera "bueno"

# Definir SLIs
SLIS = [
    SLI(
        name='availability',
        description='Percentage of successful requests',
        query='''
            sum(rate(http_requests_total{status!~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
        ''',
        good_threshold=0.99  # 99% de requests exitosos
    ),
    SLI(
        name='latency',
        description='95th percentile latency',
        query='''
            histogram_quantile(0.95,
                sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
            )
        ''',
        good_threshold=0.5  # P95 < 500ms
    ),
    SLI(
        name='error_rate',
        description='Percentage of error responses',
        query='''
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))
        ''',
        good_threshold=0.01  # < 1% errores
    )
]

@dataclass
class SLO:
    """Service Level Objective"""
    sli: SLI
    target: float  # Target (ej: 0.999 para 99.9%)
    window: timedelta  # Ventana de medición (ej: 30 días)

# Definir SLOs
SLOS = [
    SLO(
        sli=SLIS[0],  # availability
        target=0.999,  # 99.9%
        window=timedelta(days=30)
    ),
    SLO(
        sli=SLIS[1],  # latency
        target=0.95,  # 95% de requests < 500ms
        window=timedelta(days=30)
    )
]

class ErrorBudget:
    """Cálculo de error budget"""

    def __init__(self, slo: SLO):
        self.slo = slo

    def calculate_budget(self, total_requests: int) -> dict:
        """
        Calcular error budget

        Error Budget = (1 - SLO) × Total Requests
        """
        error_budget = (1 - self.slo.target) * total_requests

        return {
            'slo_target': self.slo.target,
            'total_requests': total_requests,
            'allowed_errors': int(error_budget),
            'allowed_downtime_minutes': self._calculate_downtime()
        }

    def _calculate_downtime(self) -> float:
        """Calcular downtime permitido"""
        window_minutes = self.slo.window.total_seconds() / 60
        downtime = window_minutes * (1 - self.slo.target)
        return downtime

    def is_budget_exhausted(self, actual_errors: int, total_requests: int) -> bool:
        """Verificar si se agotó el error budget"""
        budget = self.calculate_budget(total_requests)
        return actual_errors >= budget['allowed_errors']

# Uso
availability_slo = SLOS[0]
error_budget = ErrorBudget(availability_slo)

# En 30 días con 10M requests
budget = error_budget.calculate_budget(10_000_000)
print(f"Allowed errors: {budget['allowed_errors']}")           # 10,000
print(f"Allowed downtime: {budget['allowed_downtime_minutes']:.2f} minutes")  # 43.2 min

# Verificar si se agotó
actual_errors = 5000
is_exhausted = error_budget.is_budget_exhausted(actual_errors, 10_000_000)
print(f"Budget exhausted: {is_exhausted}")  # False

Alerting basado en Error Budget:

# error-budget-alerts.yml
groups:
  - name: error_budget
    rules:
      # Error budget 50% consumido
      - alert: ErrorBudget50PercentConsumed
        expr: |
          (
            1 - (
              sum(rate(http_requests_total{status!~"5.."}[30d]))
              /
              sum(rate(http_requests_total[30d]))
            )
          ) / (1 - 0.999) > 0.5
        labels:
          severity: warning
        annotations:
          summary: "50% of error budget consumed"
          description: "We've used half our error budget for this month"

      # Error budget 90% consumido
      - alert: ErrorBudget90PercentConsumed
        expr: |
          (
            1 - (
              sum(rate(http_requests_total{status!~"5.."}[30d]))
              /
              sum(rate(http_requests_total[30d]))
            )
          ) / (1 - 0.999) > 0.9
        labels:
          severity: critical
        annotations:
          summary: "90% of error budget consumed"
          description: "URGENT: Error budget almost exhausted. Freeze deployments."

      # Burn rate alto (consumiendo budget rápidamente)
      - alert: HighErrorBudgetBurnRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[1h]))
            /
            sum(rate(http_requests_total[1h]))
          ) > (14.4 * (1 - 0.999))
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error budget burn rate"
          description: "At this rate, we'll exhaust error budget in 2 days"

Dashboard de SLOs:

# Generar reporte de SLOs
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta

class SLOReporter:
    """Generador de reportes de SLOs"""

    def __init__(self, prometheus_url: str):
        self.prom = PrometheusConnect(url=prometheus_url)

    def generate_report(self, slo: SLO) -> dict:
        """Generar reporte de SLO"""
        end_time = datetime.now()
        start_time = end_time - slo.window

        # Ejecutar query
        result = self.prom.custom_query_range(
            query=slo.sli.query,
            start_time=start_time,
            end_time=end_time,
            step='1h'
        )

        # Calcular cumplimiento
        values = [float(point[1]) for point in result[0]['values']]
        avg_value = sum(values) / len(values)

        # Comparar con target
        is_meeting_slo = avg_value >= slo.target

        # Calcular error budget
        total_requests = self._get_total_requests(start_time, end_time)
        error_budget = ErrorBudget(slo)
        budget_info = error_budget.calculate_budget(total_requests)

        return {
            'slo_name': slo.sli.name,
            'target': slo.target,
            'actual': avg_value,
            'is_meeting_slo': is_meeting_slo,
            'margin': avg_value - slo.target,
            'error_budget': budget_info,
            'period': {
                'start': start_time.isoformat(),
                'end': end_time.isoformat()
            }
        }

    def _get_total_requests(self, start_time, end_time) -> int:
        """Obtener total de requests en período"""
        query = 'sum(increase(http_requests_total[30d]))'
        result = self.prom.custom_query(query)
        return int(float(result[0]['value'][1]))

# Uso
reporter = SLOReporter('http://prometheus:9090')

for slo in SLOS:
    report = reporter.generate_report(slo)

    print(f"\n=== {report['slo_name'].upper()} ===")
    print(f"Target: {report['target']*100}%")
    print(f"Actual: {report['actual']*100:.2f}%")
    print(f"Status: {'✅ MEETING' if report['is_meeting_slo'] else '❌ FAILING'}")
    print(f"Margin: {report['margin']*100:+.2f}%")
    print(f"Error Budget:")
    print(f"  Allowed errors: {report['error_budget']['allowed_errors']}")
    print(f"  Allowed downtime: {report['error_budget']['allowed_downtime_minutes']:.1f} min")

Receta 4.12: Incident Response

On-Call Runbook:

# Incident Response Runbook

## Severity Levels

### SEV-1 (Critical)
- Complete service outage
- Data loss/corruption
- Security breach
- Response: Immediate (24/7)

### SEV-2 (High)
- Partial service degradation
- Major feature broken
- Performance severely impacted
- Response: Within 1 hour

### SEV-3 (Medium)
- Minor feature broken
- Workaround available
- Response: Within 4 hours

### SEV-4 (Low)
- Cosmetic issues
- Documentation errors
- Response: Next business day

## Incident Response Process

1. **Detection** — Alert fired, user report, monitoring detected
2. **Triage** — Classify severity, page on-call engineer, create incident channel
3. **Mitigation** — Stop the bleeding, implement workaround, rollback if needed
4. **Investigation** — Identify root cause, gather evidence, document timeline
5. **Resolution** — Deploy fix, verify resolution, monitor for recurrence
6. **Post-Mortem** — Write incident report, identify action items, update runbooks

## Common Issues

### High Error Rate

```bash
# Check error logs
kubectl logs -l app=api --tail=100 | grep ERROR

# Check recent deployments
kubectl rollout history deployment/api

# Rollback deployment
kubectl rollout undo deployment/api

# Scale up
kubectl scale deployment/api --replicas=10

High Latency

-- Check slow queries
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10;

Database Connection Pool Exhausted

-- Check active connections
SELECT count(*) FROM pg_stat_activity;

-- Kill long-running queries
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
AND state = 'active'
AND now() - query_start > interval '5 minutes';

**Incident Management Tool:**

```python
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List, Optional

class Severity(Enum):
    SEV1 = "critical"
    SEV2 = "high"
    SEV3 = "medium"
    SEV4 = "low"

class IncidentStatus(Enum):
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    MONITORING = "monitoring"
    RESOLVED = "resolved"

@dataclass
class Incident:
    """Incident tracking"""
    id: str
    title: str
    severity: Severity
    status: IncidentStatus
    created_at: datetime
    detected_by: str
    assigned_to: Optional[str] = None
    description: str = ""
    timeline: List[dict] = None
    root_cause: Optional[str] = None
    resolution: Optional[str] = None

    def __post_init__(self):
        if self.timeline is None:
            self.timeline = []

        self.add_timeline_event(
            f"Incident created by {self.detected_by}",
            IncidentStatus.INVESTIGATING
        )

    def add_timeline_event(self, description: str, new_status: IncidentStatus = None):
        """Agregar evento al timeline"""
        event = {
            'timestamp': datetime.utcnow(),
            'description': description
        }

        if new_status:
            event['status_change'] = {
                'from': self.status.value,
                'to': new_status.value
            }
            self.status = new_status

        self.timeline.append(event)

    def assign(self, engineer: str):
        """Asignar a ingeniero"""
        self.assigned_to = engineer
        self.add_timeline_event(f"Assigned to {engineer}")

    def update_status(self, new_status: IncidentStatus, note: str):
        """Actualizar status"""
        self.add_timeline_event(note, new_status)

    def resolve(self, root_cause: str, resolution: str):
        """Resolver incident"""
        self.root_cause = root_cause
        self.resolution = resolution
        self.update_status(
            IncidentStatus.RESOLVED,
            f"Incident resolved. Root cause: {root_cause}"
        )

    def generate_report(self) -> str:
        """Generar reporte de incident"""
        duration = datetime.utcnow() - self.created_at

        report = f"""
# Incident Report: {self.title}

**Incident ID:** {self.id}
**Severity:** {self.severity.value}
**Status:** {self.status.value}
**Duration:** {duration}
**Detected By:** {self.detected_by}
**Assigned To:** {self.assigned_to}

## Description
{self.description}

## Timeline
"""
        for event in self.timeline:
            report += f"\n- **{event['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}**: {event['description']}"

        if self.root_cause:
            report += f"\n\n## Root Cause\n{self.root_cause}"

        if self.resolution:
            report += f"\n\n## Resolution\n{self.resolution}"

        return report

# Uso
incident = Incident(
    id="INC-2024-001",
    title="High error rate in API",
    severity=Severity.SEV1,
    status=IncidentStatus.INVESTIGATING,
    created_at=datetime.utcnow(),
    detected_by="monitoring-system",
    description="Error rate spiked to 15% at 14:23 UTC"
)

incident.assign("alice@example.com")
incident.add_timeline_event("Identified database connection pool exhaustion")
incident.update_status(IncidentStatus.IDENTIFIED, "Root cause identified")
incident.add_timeline_event("Increased pool size from 20 to 50")
incident.update_status(IncidentStatus.MONITORING, "Monitoring for stability")

incident.resolve(
    root_cause="Database connection pool too small for traffic spike",
    resolution="Increased pool size and added auto-scaling"
)

print(incident.generate_report())

¡Felicidades!

Has completado la FASE 4: Seguridad y Observabilidad del roadmap.

Lo que has aprendido:

Próximos pasos:

FASE 5: Ingeniería de Datos


Versión: 1.0 Fecha: 2024 Autor: Roadmap del Desarrollador del Futuro Licencia: Uso educativo