📊 Developer Cookbook - FASE 5: Pipelines de Datos

Recetas prácticas para construir pipelines de datos robustos y escalables


📚 Tabla de Contenidos

  1. Receta 5.1: ETL vs ELT - ¿Cuál usar?
  2. Receta 5.2: Apache Airflow - Orquestación de Pipelines
  3. Receta 5.3: Apache Spark - Procesamiento Distribuido
  4. Receta 5.4: Data Warehousing vs Data Lakes
  5. Receta 5.5: Stream Processing con Kafka

Pipelines de Datos

Receta 5.1: ETL vs ELT - ¿Cuál usar?

¿Qué son?

Comparación:

CaracterísticaETLELT
TransformaciónAntes de cargarDespués de cargar
Velocidad inicial🟡 Más lenta🟢 Más rápida
Flexibilidad🔴 Menos flexible🟢 Muy flexible
Costos compute🟢 Predecible🟡 Variable
Mejor paraData warehouses tradicionalesCloud data warehouses
Storage🟢 Menos storage🔴 Más storage
Casos de usoDatos sensibles, complianceExploración, analytics

Ejemplo ETL con Python:

import pandas as pd
from datetime import datetime
import psycopg2
from typing import Dict, List

class ETLPipeline:
    """Pipeline ETL tradicional"""
    
    def __init__(self, source_config: Dict, target_config: Dict):
        self.source_config = source_config
        self.target_config = target_config
    
    def extract(self, query: str) -> pd.DataFrame:
        """E - Extract: Extraer datos de la fuente"""
        print("📥 Extracting data from source...")
        
        # Conectar a base de datos source
        conn = psycopg2.connect(**self.source_config)
        
        # Extraer datos
        df = pd.read_sql(query, conn)
        conn.close()
        
        print(f"✅ Extracted {len(df)} rows")
        return df
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """T - Transform: Limpiar y transformar datos"""
        print("⚙️ Transforming data...")
        
        # 1. Limpiar valores nulos
        df = df.dropna(subset=['customer_id', 'order_date'])
        
        # 2. Normalizar columnas de texto
        df['customer_name'] = df['customer_name'].str.strip().str.title()
        df['email'] = df['email'].str.lower()
        
        # 3. Convertir tipos de datos
        df['order_date'] = pd.to_datetime(df['order_date'])
        df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
        
        # 4. Crear columnas derivadas
        df['order_year'] = df['order_date'].dt.year
        df['order_month'] = df['order_date'].dt.month
        df['order_quarter'] = df['order_date'].dt.quarter
        
        # 5. Categorizar clientes
        df['customer_segment'] = pd.cut(
            df['total_amount'],
            bins=[0, 100, 500, 1000, float('inf')],
            labels=['Bronze', 'Silver', 'Gold', 'Platinum']
        )
        
        # 6. Remover duplicados
        df = df.drop_duplicates(subset=['order_id'])
        
        # 7. Agregar metadata
        df['etl_processed_at'] = datetime.now()
        df['etl_version'] = '1.0'
        
        print(f"✅ Transformed to {len(df)} rows")
        return df
    
    def load(self, df: pd.DataFrame, table_name: str):
        """L - Load: Cargar datos al destino"""
        print(f"📤 Loading data to {table_name}...")
        
        # Conectar a base de datos target
        conn = psycopg2.connect(**self.target_config)
        cursor = conn.cursor()
        
        # Truncar tabla si existe (full refresh)
        cursor.execute(f"TRUNCATE TABLE {table_name}")
        
        # Bulk insert
        from io import StringIO
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        
        cursor.copy_from(
            buffer,
            table_name,
            sep=',',
            null='',
            columns=df.columns.tolist()
        )
        
        conn.commit()
        cursor.close()
        conn.close()
        
        print(f"✅ Loaded {len(df)} rows to {table_name}")
    
    def run(self, extract_query: str, target_table: str):
        """Ejecutar pipeline completo"""
        start_time = datetime.now()
        print(f"\n{'='*60}")
        print(f"🚀 Starting ETL Pipeline - {start_time}")
        print(f"{'='*60}\n")
        
        try:
            # Extract
            raw_data = self.extract(extract_query)
            
            # Transform
            clean_data = self.transform(raw_data)
            
            # Load
            self.load(clean_data, target_table)
            
            # Log success
            duration = (datetime.now() - start_time).total_seconds()
            print(f"\n{'='*60}")
            print(f"✅ ETL Pipeline completed successfully in {duration:.2f}s")
            print(f"{'='*60}\n")
            
        except Exception as e:
            print(f"\n❌ ETL Pipeline failed: {str(e)}")
            raise

# Uso
source_config = {
    'host': 'source-db.example.com',
    'database': 'production',
    'user': 'etl_user',
    'password': 'password'
}

target_config = {
    'host': 'warehouse.example.com',
    'database': 'analytics',
    'user': 'warehouse_user',
    'password': 'password'
}

pipeline = ETLPipeline(source_config, target_config)

extract_query = """
    SELECT 
        o.order_id,
        o.customer_id,
        c.customer_name,
        c.email,
        o.order_date,
        o.total_amount
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    WHERE o.order_date >= '2024-01-01'
"""

pipeline.run(extract_query, 'dim_orders')

Ejemplo ELT con dbt (Data Build Tool):

# models/staging/stg_orders.sql
-- Paso 1: Cargar datos crudos (ya están en el warehouse)
{{
  config(
    materialized='view'
  )
}}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    _loaded_at
FROM {{ source('raw', 'orders') }}

# models/intermediate/int_orders_cleaned.sql
-- Paso 2: Limpiar datos
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

SELECT
    order_id,
    customer_id,
    CAST(order_date AS DATE) as order_date,
    ROUND(total_amount, 2) as total_amount,
    EXTRACT(YEAR FROM order_date) as order_year,
    EXTRACT(MONTH FROM order_date) as order_month,
    EXTRACT(QUARTER FROM order_date) as order_quarter
FROM {{ ref('stg_orders') }}
WHERE order_date IS NOT NULL
  AND total_amount > 0

{% if is_incremental() %}
  -- Solo procesar nuevos registros
  AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

# models/marts/fct_orders.sql
-- Paso 3: Crear tabla final para analytics
{{
  config(
    materialized='table'
  )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.order_date,
    o.order_year,
    o.order_month,
    o.total_amount,
    CASE
        WHEN o.total_amount < 100 THEN 'Bronze'
        WHEN o.total_amount < 500 THEN 'Silver'
        WHEN o.total_amount < 1000 THEN 'Gold'
        ELSE 'Platinum'
    END as order_tier
FROM {{ ref('int_orders_cleaned') }} o
LEFT JOIN {{ ref('dim_customers') }} c 
    ON o.customer_id = c.customer_id

Cuándo usar cada uno:

ETL:

ELT:


Receta 5.2: Apache Airflow - Orquestación de Pipelines

¿Qué es Airflow? Plataforma para crear, programar y monitorear workflows de datos como código (DAGs).

Conceptos clave:

DAG básico:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd

# Default arguments para todas las tasks
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1)
}

# Definir DAG
dag = DAG(
    'daily_sales_etl',
    default_args=default_args,
    description='Daily sales ETL pipeline',
    schedule_interval='0 2 * * *',  # Todos los días a las 2am
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'etl', 'daily']
)

# Task 1: Verificar que archivo existe
wait_for_file = FileSensor(
    task_id='wait_for_sales_file',
    filepath='/data/sales/{{ ds }}.csv',  # {{ ds }} = execution date
    poke_interval=60,  # Check cada 60 segundos
    timeout=3600,  # Timeout después de 1 hora
    mode='poke',
    dag=dag
)

# Task 2: Extraer datos
def extract_sales(**context):
    """Extraer datos del archivo"""
    execution_date = context['ds']
    
    df = pd.read_csv(f'/data/sales/{execution_date}.csv')
    
    # Guardar en XCom para siguiente task
    context['task_instance'].xcom_push(
        key='row_count',
        value=len(df)
    )
    
    print(f"✅ Extracted {len(df)} rows")
    return df.to_json()

extract_task = PythonOperator(
    task_id='extract_sales_data',
    python_callable=extract_sales,
    provide_context=True,
    dag=dag
)

# Task 3: Transformar datos
def transform_sales(**context):
    """Transformar datos"""
    # Obtener datos de task anterior via XCom
    raw_data = context['task_instance'].xcom_pull(
        task_ids='extract_sales_data'
    )
    
    df = pd.read_json(raw_data)
    
    # Transformaciones
    df['sale_date'] = pd.to_datetime(df['sale_date'])
    df['revenue'] = df['quantity'] * df['price']
    df = df.dropna()
    
    print(f"✅ Transformed {len(df)} rows")
    return df.to_json()

transform_task = PythonOperator(
    task_id='transform_sales_data',
    python_callable=transform_sales,
    provide_context=True,
    dag=dag
)

# Task 4: Cargar a database
def load_to_database(**context):
    """Cargar datos a database"""
    from sqlalchemy import create_engine
    
    transformed_data = context['task_instance'].xcom_pull(
        task_ids='transform_sales_data'
    )
    
    df = pd.read_json(transformed_data)
    
    engine = create_engine('postgresql://user:pass@host:5432/db')
    df.to_sql(
        'sales_fact',
        engine,
        if_exists='append',
        index=False
    )
    
    print(f"✅ Loaded {len(df)} rows to database")

load_task = PythonOperator(
    task_id='load_to_database',
    python_callable=load_to_database,
    provide_context=True,
    dag=dag
)

# Task 5: Generar reporte
generate_report = BashOperator(
    task_id='generate_daily_report',
    bash_command="""
        python /scripts/generate_report.py \
            --date {{ ds }} \
            --output /reports/sales_{{ ds }}.pdf
    """,
    dag=dag
)

# Task 6: Cleanup
cleanup = BashOperator(
    task_id='cleanup_temp_files',
    bash_command='rm -f /tmp/sales_{{ ds }}*',
    dag=dag
)

# Definir dependencias (orden de ejecución)
wait_for_file >> extract_task >> transform_task >> load_task >> generate_report >> cleanup

DAG con branching (conditional execution):

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator

def check_data_quality(**context):
    """Decidir siguiente paso basado en calidad de datos"""
    row_count = context['task_instance'].xcom_pull(
        task_ids='extract_sales_data',
        key='row_count'
    )
    
    if row_count > 1000:
        return 'process_large_dataset'
    elif row_count > 0:
        return 'process_small_dataset'
    else:
        return 'skip_processing'

branch_task = BranchPythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    provide_context=True,
    dag=dag
)

process_large = PythonOperator(
    task_id='process_large_dataset',
    python_callable=lambda: print("Processing large dataset with Spark"),
    dag=dag
)

process_small = PythonOperator(
    task_id='process_small_dataset',
    python_callable=lambda: print("Processing small dataset with Pandas"),
    dag=dag
)

skip = DummyOperator(
    task_id='skip_processing',
    dag=dag
)

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed_min_one_success',  # Continuar si al menos uno succeeded
    dag=dag
)

# Dependencias
extract_task >> branch_task >> [process_large, process_small, skip] >> join

TaskGroups (organizar tasks relacionadas):

from airflow.utils.task_group import TaskGroup

with DAG('complex_pipeline', ...) as dag:
    
    start = DummyOperator(task_id='start')
    
    # TaskGroup para procesamiento de customers
    with TaskGroup('process_customers', tooltip='Customer data processing') as customer_group:
        extract_customers = PythonOperator(...)
        transform_customers = PythonOperator(...)
        load_customers = PythonOperator(...)
        
        extract_customers >> transform_customers >> load_customers
    
    # TaskGroup para procesamiento de orders
    with TaskGroup('process_orders', tooltip='Order data processing') as order_group:
        extract_orders = PythonOperator(...)
        transform_orders = PythonOperator(...)
        load_orders = PythonOperator(...)
        
        extract_orders >> transform_orders >> load_orders
    
    # TaskGroup para analytics
    with TaskGroup('analytics', tooltip='Generate analytics') as analytics_group:
        calculate_metrics = PythonOperator(...)
        generate_dashboard = PythonOperator(...)
        send_notifications = PythonOperator(...)
        
        calculate_metrics >> [generate_dashboard, send_notifications]
    
    end = DummyOperator(task_id='end')
    
    # Dependencias entre grupos
    start >> [customer_group, order_group] >> analytics_group >> end

Dynamic Task Generation:

from airflow.models import Variable

def create_processing_tasks():
    """Crear tasks dinámicamente basado en configuración"""
    
    # Leer configuración (puede venir de Variable, database, etc.)
    sources = Variable.get('data_sources', deserialize_json=True)
    
    # sources = [
    #     {'name': 'salesforce', 'table': 'accounts'},
    #     {'name': 'hubspot', 'table': 'contacts'},
    #     {'name': 'stripe', 'table': 'payments'}
    # ]
    
    tasks = []
    
    for source in sources:
        task = PythonOperator(
            task_id=f"process_{source['name']}_{source['table']}",
            python_callable=process_source,
            op_kwargs={
                'source_name': source['name'],
                'table_name': source['table']
            },
            dag=dag
        )
        tasks.append(task)
    
    return tasks

def process_source(source_name, table_name):
    """Procesar un source específico"""
    print(f"Processing {source_name}.{table_name}")
    # Lógica de procesamiento

# Crear tasks dinámicamente
start = DummyOperator(task_id='start', dag=dag)
processing_tasks = create_processing_tasks()
end = DummyOperator(task_id='end', dag=dag)

# Dependencias
start >> processing_tasks >> end

Monitoring y Alerting:

from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email

def custom_failure_alert(context):
    """Callback personalizado cuando falla una task"""
    task_instance = context['task_instance']
    exception = context.get('exception')
    
    subject = f"❌ Airflow Task Failed: {task_instance.task_id}"
    
    html_content = f"""
    <h3>Task Failed</h3>
    <ul>
        <li><b>DAG:</b> {task_instance.dag_id}</li>
        <li><b>Task:</b> {task_instance.task_id}</li>
        <li><b>Execution Date:</b> {context['execution_date']}</li>
        <li><b>Error:</b> {exception}</li>
        <li><b>Log URL:</b> {task_instance.log_url}</li>
    </ul>
    """
    
    send_email(
        to=['data-team@example.com'],
        subject=subject,
        html_content=html_content
    )

def data_quality_check(**context):
    """Verificar calidad de datos"""
    # Verificaciones
    checks = {
        'null_check': check_nulls(),
        'duplicate_check': check_duplicates(),
        'schema_check': check_schema()
    }
    
    failed_checks = [k for k, v in checks.items() if not v]
    
    if failed_checks:
        raise ValueError(f"Data quality checks failed: {failed_checks}")
    
    print("✅ All data quality checks passed")

quality_check = PythonOperator(
    task_id='data_quality_check',
    python_callable=data_quality_check,
    on_failure_callback=custom_failure_alert,
    dag=dag
)

Comandos Airflow útiles:

# Inicializar database
airflow db init

# Crear usuario admin
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Listar DAGs
airflow dags list

# Test DAG (syntax check)
airflow dags test daily_sales_etl 2024-01-01

# Test task específica
airflow tasks test daily_sales_etl extract_sales_data 2024-01-01

# Trigger DAG manualmente
airflow dags trigger daily_sales_etl

# Ver estado de DAG runs
airflow dags list-runs -d daily_sales_etl

# Pausar/Despausar DAG
airflow dags pause daily_sales_etl
airflow dags unpause daily_sales_etl

# Ver logs de task
airflow tasks logs daily_sales_etl extract_sales_data 2024-01-01

# Limpiar DAG runs antiguos
airflow dags delete daily_sales_etl

Receta 5.3: Apache Spark - Procesamiento Distribuido

¿Qué es Spark? Motor de procesamiento distribuido para big data, mucho más rápido que MapReduce.

Conceptos clave:

Spark con PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# ===== INICIALIZAR SPARK =====
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# ===== LEER DATOS =====

# CSV
df_sales = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://bucket/sales/*.csv")

# Parquet (formato columnar optimizado)
df_customers = spark.read.parquet("s3://bucket/customers/")

# JSON
df_events = spark.read.json("s3://bucket/events/*.json")

# Database
df_products = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "products") \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

# ===== TRANSFORMACIONES BÁSICAS =====

# Select columnas
df_selected = df_sales.select(
    "order_id",
    "customer_id",
    "total_amount",
    "order_date"
)

# Filter
df_filtered = df_sales.filter(
    (col("total_amount") > 100) & 
    (col("order_date") >= "2024-01-01")
)

# Agregar columnas
df_enriched = df_sales.withColumn(
    "order_year",
    year(col("order_date"))
).withColumn(
    "revenue_tier",
    when(col("total_amount") < 100, "Low")
    .when(col("total_amount") < 500, "Medium")
    .otherwise("High")
)

# Renombrar columnas
df_renamed = df_sales.withColumnRenamed("total_amount", "revenue")

# Drop columnas
df_dropped = df_sales.drop("unnecessary_column")

# ===== AGREGACIONES =====

# Group by y aggregate
df_summary = df_sales.groupBy("customer_id") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("total_amount").alias("total_revenue"),
        avg("total_amount").alias("avg_order_value"),
        max("order_date").alias("last_order_date")
    )

# Multiple group by
df_monthly = df_sales.groupBy(
    year("order_date").alias("year"),
    month("order_date").alias("month")
).agg(
    sum("total_amount").alias("monthly_revenue"),
    count("order_id").alias("order_count")
).orderBy("year", "month")

# ===== JOINS =====

# Inner join
df_orders_with_customers = df_sales.join(
    df_customers,
    df_sales.customer_id == df_customers.id,
    "inner"
)

# Left join
df_all_orders = df_sales.join(
    df_customers,
    df_sales.customer_id == df_customers.id,
    "left"
)

# Broadcast join (para tablas pequeñas)
from pyspark.sql.functions import broadcast

df_with_products = df_sales.join(
    broadcast(df_products),  # df_products es pequeño
    df_sales.product_id == df_products.id
)

# ===== WINDOW FUNCTIONS =====

# Ranking
window_spec = Window.partitionBy("customer_id").orderBy(desc("order_date"))

df_ranked = df_sales.withColumn(
    "order_rank",
    row_number().over(window_spec)
).withColumn(
    "revenue_rank",
    rank().over(Window.partitionBy("customer_id").orderBy(desc("total_amount")))
)

# Running total
window_running = Window.partitionBy("customer_id") \
    .orderBy("order_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_cumulative = df_sales.withColumn(
    "cumulative_revenue",
    sum("total_amount").over(window_running)
)

# Moving average
window_moving = Window.partitionBy("customer_id") \
    .orderBy("order_date") \
    .rowsBetween(-6, 0)  # Últimos 7 registros

df_moving_avg = df_sales.withColumn(
    "moving_avg_7",
    avg("total_amount").over(window_moving)
)

# ===== LIMPIEZA DE DATOS =====

# Remover nulls
df_clean = df_sales.dropna(subset=["customer_id", "order_date"])

# Rellenar nulls
df_filled = df_sales.fillna({
    "discount": 0,
    "notes": "No notes",
    "category": "Uncategorized"
})

# Remover duplicados
df_deduped = df_sales.dropDuplicates(["order_id"])

# ===== UDFs (User Defined Functions) =====

from pyspark.sql.types import StringType

def categorize_customer(total_revenue):
    """Categorizar cliente por revenue"""
    if total_revenue > 10000:
        return "VIP"
    elif total_revenue > 5000:
        return "Premium"
    elif total_revenue > 1000:
        return "Standard"
    else:
        return "Basic"

# Registrar UDF
categorize_udf = udf(categorize_customer, StringType())

# Usar UDF
df_categorized = df_summary.withColumn(
    "customer_category",
    categorize_udf(col("total_revenue"))
)

# ===== OPTIMIZACIONES =====

# 1. Cache para reutilizar DataFrame
df_sales.cache()
df_sales.count()  # Trigger cache

# 2. Repartition para balancear carga
df_repartitioned = df_sales.repartition(200, "customer_id")

# 3. Coalesce para reducir partitions (después de filter)
df_small = df_filtered.coalesce(10)

# 4. Broadcast hint para joins
df_optimized = df_sales.join(
    broadcast(df_products),
    "product_id"
)

# ===== ESCRIBIR RESULTADOS =====

# Parquet (recomendado para analytics)
df_summary.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://bucket/output/summary/")

# CSV
df_summary.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3://bucket/output/summary.csv")

# Database
df_summary.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "customer_summary") \
    .option("user", "user") \
    .option("password", "pass") \
    .mode("overwrite") \
    .save()

# Delta Lake (versionado, ACID)
df_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3://bucket/delta/summary")

# ===== CASO DE USO COMPLETO =====

class SparkETLPipeline:
    """Pipeline ETL con Spark"""
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
    
    def extract_sales_data(self, date_from: str, date_to: str):
        """Extraer datos de ventas"""
        df = self.spark.read.parquet("s3://bucket/sales/") \
            .filter(
                (col("order_date") >= date_from) &
                (col("order_date") <= date_to)
            )
        
        print(f"📥 Extracted {df.count()} sales records")
        return df
    
    def enrich_with_customer_data(self, df_sales):
        """Enriquecer con datos de clientes"""
        df_customers = self.spark.read.parquet("s3://bucket/customers/")
        
        df_enriched = df_sales.join(
            df_customers,
            df_sales.customer_id == df_customers.id,
            "left"
        ).select(
            df_sales["*"],
            df_customers["customer_name"],
            df_customers["customer_segment"],
            df_customers["customer_lifetime_value"]
        )
        
        return df_enriched
    
    def calculate_metrics(self, df):
        """Calcular métricas de negocio"""
        
        # Customer-level metrics
        df_customer_metrics = df.groupBy("customer_id", "customer_name") \
            .agg(
                count("order_id").alias("total_orders"),
                sum("total_amount").alias("total_revenue"),
                avg("total_amount").alias("avg_order_value"),
                max("order_date").alias("last_purchase_date"),
                min("order_date").alias("first_purchase_date")
            )
        
        # Add derived metrics
        df_customer_metrics = df_customer_metrics.withColumn(
            "customer_tenure_days",
            datediff(col("last_purchase_date"), col("first_purchase_date"))
        ).withColumn(
            "avg_days_between_orders",
            col("customer_tenure_days") / col("total_orders")
        )
        
        # Product-level metrics
        df_product_metrics = df.groupBy("product_id", "product_name") \
            .agg(
                count("order_id").alias("times_sold"),
                sum("quantity").alias("total_quantity"),
                sum("total_amount").alias("total_revenue")
            ) \
            .orderBy(desc("total_revenue"))
        
        return df_customer_metrics, df_product_metrics
    
    def run(self, date_from: str, date_to: str):
        """Ejecutar pipeline completo"""
        print(f"\n{'='*60}")
        print(f"🚀 Starting Spark ETL Pipeline")
        print(f"{'='*60}\n")
        
        # Extract
        df_sales = self.extract_sales_data(date_from, date_to)
        
        # Transform
        df_enriched = self.enrich_with_customer_data(df_sales)
        df_customer_metrics, df_product_metrics = self.calculate_metrics(df_enriched)
        
        # Load
        df_customer_metrics.write \
            .mode("overwrite") \
            .parquet("s3://bucket/metrics/customer_metrics/")
        
        df_product_metrics.write \
            .mode("overwrite") \
            .parquet("s3://bucket/metrics/product_metrics/")
        
        print(f"\n{'='*60}")
        print(f"✅ Pipeline completed successfully")
        print(f"{'='*60}\n")

# Uso
pipeline = SparkETLPipeline(spark)
pipeline.run("2024-01-01", "2024-12-31")

Spark Streaming (procesamiento en tiempo real):

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamProcessing") \
    .getOrCreate()

# Leer stream desde Kafka
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sales-events") \
    .load()

# Parse JSON
schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("timestamp", TimestampType())
])

df_parsed = df_stream.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Windowed aggregation (últimos 5 minutos)
df_windowed = df_parsed \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        window("timestamp", "5 minutes", "1 minute"),
        "customer_id"
    ) \
    .agg(
        count("order_id").alias("order_count"),
        sum("amount").alias("total_amount")
    )

# Escribir stream
query = df_windowed.writeStream \
    .outputMode("update") \
    .format("parquet") \
    .option("path", "s3://bucket/streaming-output/") \
    .option("checkpointLocation", "s3://bucket/checkpoints/") \
    .start()

query.awaitTermination()

Receta 5.4: Data Warehousing vs Data Lakes

¿Qué son?

Data Warehouse:

Data Lake:

Comparación:

CaracterísticaData WarehouseData Lake
DatosEstructuradosTodos los tipos
SchemaSchema-on-writeSchema-on-read
UsuariosAnalistas de negocioData scientists, engineers
ProcesamientoSQLSQL, Python, Spark
Costo💰💰💰 Alto💰 Bajo
Performance🟢 Rápido🟡 Depende
Flexibilidad🔴 Rígido🟢 Muy flexible
Casos de usoBI, reportingML, exploración, archives

Arquitectura Data Lakehouse (lo mejor de ambos):

┌─────────────────────────────────────────────────────────────┐
│                    Data Sources                              │
├─────────────────────────────────────────────────────────────┤
│  Databases │ APIs │ SaaS │ Logs │ IoT │ Files              │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                Raw Layer (Data Lake)                         │
├─────────────────────────────────────────────────────────────┤
│  S3/Azure Data Lake/GCS                                     │
│  • Formato: JSON, CSV, Avro, Parquet                        │
│  • Sin transformación                                        │
│  • Particionado por fecha                                    │
└──────────────────────┬──────────────────────────────────────┘

                       ▼ (ETL/ELT)
┌─────────────────────────────────────────────────────────────┐
│              Processed Layer (Bronze → Silver)               │
├─────────────────────────────────────────────────────────────┤
│  Delta Lake / Apache Iceberg                                │
│  • Formato: Parquet con metadata                            │
│  • Limpiado y validado                                       │
│  • ACID transactions                                         │
└──────────────────────┬──────────────────────────────────────┘

                       ▼ (Transformations)
┌─────────────────────────────────────────────────────────────┐
│              Curated Layer (Gold)                            │
├─────────────────────────────────────────────────────────────┤
│  • Dimensional modeling (star schema)                       │
│  • Aggregations y métricas                                   │
│  • Optimizado para queries                                   │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                 Consumption Layer                            │
├─────────────────────────────────────────────────────────────┤
│  BI Tools │ ML Models │ Apps │ APIs                         │
└─────────────────────────────────────────────────────────────┘

Medallion Architecture (Bronze → Silver → Gold):

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# ===== BRONZE LAYER: Datos crudos =====

def ingest_to_bronze(source_path: str, bronze_path: str):
    """Cargar datos crudos sin transformación"""
    
    df_raw = spark.read.json(source_path)
    
    # Agregar metadata de ingesta
    df_bronze = df_raw \
        .withColumn("_ingested_at", current_timestamp()) \
        .withColumn("_source_file", input_file_name())
    
    # Escribir a Bronze (append only)
    df_bronze.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("_ingested_date") \
        .save(bronze_path)
    
    print(f"✅ Ingested {df_bronze.count()} records to Bronze")

# ===== SILVER LAYER: Datos limpios =====

def bronze_to_silver(bronze_path: str, silver_path: str):
    """Limpiar y validar datos"""
    
    df_bronze = spark.read.format("delta").load(bronze_path)
    
    # Transformaciones
    df_silver = df_bronze \
        .dropna(subset=["order_id", "customer_id"]) \
        .dropDuplicates(["order_id"]) \
        .withColumn("order_date", to_date("order_date")) \
        .withColumn("total_amount", col("total_amount").cast("decimal(10,2)")) \
        .filter(col("total_amount") > 0) \
        .select(
            "order_id",
            "customer_id",
            "order_date",
            "total_amount",
            "status",
            "_ingested_at"
        )
    
    # Merge incremental (upsert)
    if DeltaTable.isDeltaTable(spark, silver_path):
        delta_silver = DeltaTable.forPath(spark, silver_path)
        
        delta_silver.alias("target").merge(
            df_silver.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        df_silver.write \
            .format("delta") \
            .mode("overwrite") \
            .save(silver_path)
    
    print(f"✅ Processed {df_silver.count()} records to Silver")

# ===== GOLD LAYER: Datos agregados para analytics =====

def silver_to_gold(silver_path: str, gold_path: str):
    """Crear tablas agregadas para BI"""
    
    df_silver = spark.read.format("delta").load(silver_path)
    
    # Métrica: Revenue por día
    df_daily_revenue = df_silver \
        .groupBy("order_date") \
        .agg(
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_revenue"),
            avg("total_amount").alias("avg_order_value")
        ) \
        .orderBy("order_date")
    
    df_daily_revenue.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{gold_path}/daily_revenue")
    
    # Métrica: Customer lifetime value
    window_spec = Window.partitionBy("customer_id").orderBy("order_date")
    
    df_customer_ltv = df_silver \
        .withColumn("cumulative_revenue", sum("total_amount").over(window_spec)) \
        .groupBy("customer_id") \
        .agg(
            count("order_id").alias("total_orders"),
            max("cumulative_revenue").alias("lifetime_value"),
            max("order_date").alias("last_order_date")
        )
    
    df_customer_ltv.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{gold_path}/customer_ltv")
    
    print("✅ Created Gold layer tables")

# Pipeline completo
def run_medallion_pipeline():
    """Ejecutar pipeline completo Bronze → Silver → Gold"""
    
    # Paths
    source_path = "s3://raw-data/orders/*.json"
    bronze_path = "s3://datalake/bronze/orders"
    silver_path = "s3://datalake/silver/orders"
    gold_path = "s3://datalake/gold"
    
    # Execute layers
    ingest_to_bronze(source_path, bronze_path)
    bronze_to_silver(bronze_path, silver_path)
    silver_to_gold(silver_path, gold_path)
    
    print("\n✅ Medallion pipeline completed")

run_medallion_pipeline()

Time Travel con Delta Lake:

# Leer versión específica
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path")

# Leer snapshot en timestamp
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15") \
    .load("path")

# Ver historia de cambios
delta_table = DeltaTable.forPath(spark, "path")
delta_table.history().show()

# Rollback a versión anterior
delta_table.restoreToVersion(5)

Receta 5.5: Stream Processing con Kafka

¿Qué es Kafka? Plataforma distribuida de streaming para publicar, suscribir y procesar flujos de datos en tiempo real.

Conceptos clave:

Producer (Python):

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
from datetime import datetime
import time

class SalesEventProducer:
    """Producer de eventos de ventas"""
    
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Esperar confirmación de todos los replicas
            retries=3,
            max_in_flight_requests_per_connection=1  # Garantizar orden
        )
        self.topic = 'sales-events'
    
    def send_event(self, event: dict):
        """Enviar evento de venta"""
        
        # Key para partitioning (todos los eventos del mismo customer van a misma partition)
        key = event.get('customer_id')
        
        # Agregar timestamp
        event['timestamp'] = datetime.now().isoformat()
        
        # Send async
        future = self.producer.send(
            self.topic,
            key=key,
            value=event
        )
        
        # Callback para manejar success/error
        try:
            record_metadata = future.get(timeout=10)
            print(f"✅ Sent to partition {record_metadata.partition}, offset {record_metadata.offset}")
        except KafkaError as e:
            print(f"❌ Failed to send: {e}")
    
    def send_batch(self, events: list):
        """Enviar múltiples eventos"""
        for event in events:
            self.send_event(event)
        
        # Flush para asegurar que todos se enviaron
        self.producer.flush()
    
    def close(self):
        """Cerrar producer"""
        self.producer.close()

# Uso
producer = SalesEventProducer()

# Simular eventos
for i in range(100):
    event = {
        'order_id': f'order_{i}',
        'customer_id': f'customer_{i % 10}',  # 10 customers
        'product_id': f'product_{i % 5}',
        'quantity': 1,
        'amount': 99.99
    }
    producer.send_event(event)
    time.sleep(0.1)

producer.close()

Consumer (Python):

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

class SalesEventConsumer:
    """Consumer de eventos de ventas"""
    
    def __init__(
        self,
        bootstrap_servers=['localhost:9092'],
        group_id='sales-processor'
    ):
        self.consumer = KafkaConsumer(
            'sales-events',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',  # 'earliest' o 'latest'
            enable_auto_commit=False,  # Manual commit para control
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            max_poll_records=100  # Batch size
        )
    
    def process_events(self):
        """Procesar eventos continuamente"""
        
        try:
            for message in self.consumer:
                # Obtener evento
                event = message.value
                
                print(f"\n📨 Received from partition {message.partition}, offset {message.offset}")
                print(f"Event: {event}")
                
                # Procesar evento
                try:
                    self.process_single_event(event)
                    
                    # Commit offset después de procesar exitosamente
                    self.consumer.commit()
                    
                except Exception as e:
                    print(f"❌ Error processing event: {e}")
                    # No hacer commit, re-procesar en siguiente poll
        
        except KeyboardInterrupt:
            print("\n🛑 Stopping consumer...")
        finally:
            self.consumer.close()
    
    def process_single_event(self, event: dict):
        """Procesar un evento individual"""
        
        # Validar evento
        if not event.get('order_id'):
            raise ValueError("Missing order_id")
        
        # Business logic
        print(f"Processing order {event['order_id']} for customer {event['customer_id']}")
        
        # Guardar en database, enviar notificación, etc.
        # save_to_database(event)
        # send_notification(event)
    
    def process_batch(self):
        """Procesar eventos en batch"""
        
        batch = []
        batch_size = 100
        
        try:
            for message in self.consumer:
                batch.append(message.value)
                
                if len(batch) >= batch_size:
                    # Procesar batch
                    self.process_batch_events(batch)
                    
                    # Commit
                    self.consumer.commit()
                    
                    batch = []
        
        except KeyboardInterrupt:
            # Procesar batch restante
            if batch:
                self.process_batch_events(batch)
                self.consumer.commit()
        finally:
            self.consumer.close()
    
    def process_batch_events(self, events: list):
        """Procesar batch de eventos"""
        print(f"📦 Processing batch of {len(events)} events")
        
        # Bulk insert a database, etc.
        # bulk_insert_to_db(events)

# Uso
consumer = SalesEventConsumer()
consumer.process_events()

Stream Processing con Kafka Streams (conceptual en Python):

from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta

class StreamProcessor:
    """Procesamiento de streams con agregaciones"""
    
    def __init__(self):
        self.consumer = KafkaConsumer(
            'sales-events',
            bootstrap_servers=['localhost:9092'],
            group_id='stream-processor',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # State store (en producción: usar RocksDB o similar)
        self.state = defaultdict(lambda: {
            'total_amount': 0,
            'count': 0,
            'last_update': None
        })
    
    def process_stream(self):
        """Procesar stream con agregaciones en ventana de tiempo"""
        
        window_duration = timedelta(minutes=5)
        
        for message in self.consumer:
            event = message.value
            customer_id = event['customer_id']
            
            # Actualizar state
            self.state[customer_id]['total_amount'] += event['amount']
            self.state[customer_id]['count'] += 1
            self.state[customer_id]['last_update'] = datetime.now()
            
            # Verificar si ventana expiró
            current_time = datetime.now()
            if self.state[customer_id]['last_update'] < current_time - window_duration:
                # Emitir resultado agregado
                result = {
                    'customer_id': customer_id,
                    'window_start': (current_time - window_duration).isoformat(),
                    'window_end': current_time.isoformat(),
                    'total_amount': self.state[customer_id]['total_amount'],
                    'order_count': self.state[customer_id]['count'],
                    'avg_order_value': self.state[customer_id]['total_amount'] / self.state[customer_id]['count']
                }
                
                # Enviar a topic de resultados
                self.producer.send('customer-metrics', value=result)
                
                # Reset state
                self.state[customer_id] = {
                    'total_amount': 0,
                    'count': 0,
                    'last_update': current_time
                }
            
            # Commit
            self.consumer.commit()

# Uso
processor = StreamProcessor()
processor.process_stream()

Kafka con Apache Flink (procesamiento avanzado):

# Ejemplo conceptual (Flink usa Java/Scala principalmente)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor

# Setup environment
env = StreamExecutionEnvironment.get_execution_environment()

# Kafka source
kafka_props = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'flink-consumer'
}

kafka_source = FlinkKafkaConsumer(
    topics='sales-events',
    deserialization_schema=SimpleStringSchema(),
    properties=kafka_props
)

# Stream pipeline
stream = env.add_source(kafka_source)

# Parse JSON y key by customer
parsed_stream = stream.map(lambda x: json.loads(x)) \
    .key_by(lambda x: x['customer_id'])

# Windowed aggregation
result_stream = parsed_stream \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .aggregate(CustomerRevenueAggregator())

# Sink to output
result_stream.add_sink(kafka_sink)

# Execute
env.execute("Sales Stream Processing")