📊 Developer Cookbook - FASE 5: Ingeniería de Datos

Recetas prácticas para construir pipelines de datos robustos y escalables


📚 Tabla de Contenidos

  1. Pipelines de Datos
  2. SQL y Python para Data Engineering

Pipelines de Datos

Receta 5.1: ETL vs ELT - ¿Cuál usar?

¿Qué son?

Comparación:

CaracterísticaETLELT
TransformaciónAntes de cargarDespués de cargar
Velocidad inicial🟡 Más lenta🟢 Más rápida
Flexibilidad🔴 Menos flexible🟢 Muy flexible
Costos compute🟢 Predecible🟡 Variable
Mejor paraData warehouses tradicionalesCloud data warehouses
Storage🟢 Menos storage🔴 Más storage
Casos de usoDatos sensibles, complianceExploración, analytics

Ejemplo ETL con Python:

import pandas as pd
from datetime import datetime
import psycopg2
from typing import Dict, List

class ETLPipeline:
    """Pipeline ETL tradicional"""
    
    def __init__(self, source_config: Dict, target_config: Dict):
        self.source_config = source_config
        self.target_config = target_config
    
    def extract(self, query: str) -> pd.DataFrame:
        """E - Extract: Extraer datos de la fuente"""
        print("📥 Extracting data from source...")
        
        # Conectar a base de datos source
        conn = psycopg2.connect(**self.source_config)
        
        # Extraer datos
        df = pd.read_sql(query, conn)
        conn.close()
        
        print(f"✅ Extracted {len(df)} rows")
        return df
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """T - Transform: Limpiar y transformar datos"""
        print("⚙️ Transforming data...")
        
        # 1. Limpiar valores nulos
        df = df.dropna(subset=['customer_id', 'order_date'])
        
        # 2. Normalizar columnas de texto
        df['customer_name'] = df['customer_name'].str.strip().str.title()
        df['email'] = df['email'].str.lower()
        
        # 3. Convertir tipos de datos
        df['order_date'] = pd.to_datetime(df['order_date'])
        df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
        
        # 4. Crear columnas derivadas
        df['order_year'] = df['order_date'].dt.year
        df['order_month'] = df['order_date'].dt.month
        df['order_quarter'] = df['order_date'].dt.quarter
        
        # 5. Categorizar clientes
        df['customer_segment'] = pd.cut(
            df['total_amount'],
            bins=[0, 100, 500, 1000, float('inf')],
            labels=['Bronze', 'Silver', 'Gold', 'Platinum']
        )
        
        # 6. Remover duplicados
        df = df.drop_duplicates(subset=['order_id'])
        
        # 7. Agregar metadata
        df['etl_processed_at'] = datetime.now()
        df['etl_version'] = '1.0'
        
        print(f"✅ Transformed to {len(df)} rows")
        return df
    
    def load(self, df: pd.DataFrame, table_name: str):
        """L - Load: Cargar datos al destino"""
        print(f"📤 Loading data to {table_name}...")
        
        # Conectar a base de datos target
        conn = psycopg2.connect(**self.target_config)
        cursor = conn.cursor()
        
        # Truncar tabla si existe (full refresh)
        cursor.execute(f"TRUNCATE TABLE {table_name}")
        
        # Bulk insert
        from io import StringIO
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        
        cursor.copy_from(
            buffer,
            table_name,
            sep=',',
            null='',
            columns=df.columns.tolist()
        )
        
        conn.commit()
        cursor.close()
        conn.close()
        
        print(f"✅ Loaded {len(df)} rows to {table_name}")
    
    def run(self, extract_query: str, target_table: str):
        """Ejecutar pipeline completo"""
        start_time = datetime.now()
        print(f"\n{'='*60}")
        print(f"🚀 Starting ETL Pipeline - {start_time}")
        print(f"{'='*60}\n")
        
        try:
            # Extract
            raw_data = self.extract(extract_query)
            
            # Transform
            clean_data = self.transform(raw_data)
            
            # Load
            self.load(clean_data, target_table)
            
            # Log success
            duration = (datetime.now() - start_time).total_seconds()
            print(f"\n{'='*60}")
            print(f"✅ ETL Pipeline completed successfully in {duration:.2f}s")
            print(f"{'='*60}\n")
            
        except Exception as e:
            print(f"\n❌ ETL Pipeline failed: {str(e)}")
            raise

# Uso
source_config = {
    'host': 'source-db.example.com',
    'database': 'production',
    'user': 'etl_user',
    'password': 'password'
}

target_config = {
    'host': 'warehouse.example.com',
    'database': 'analytics',
    'user': 'warehouse_user',
    'password': 'password'
}

pipeline = ETLPipeline(source_config, target_config)

extract_query = """
    SELECT 
        o.order_id,
        o.customer_id,
        c.customer_name,
        c.email,
        o.order_date,
        o.total_amount
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    WHERE o.order_date >= '2024-01-01'
"""

pipeline.run(extract_query, 'dim_orders')

Ejemplo ELT con dbt (Data Build Tool):

# models/staging/stg_orders.sql
-- Paso 1: Cargar datos crudos (ya están en el warehouse)
{{
  config(
    materialized='view'
  )
}}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    _loaded_at
FROM {{ source('raw', 'orders') }}

# models/intermediate/int_orders_cleaned.sql
-- Paso 2: Limpiar datos
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

SELECT
    order_id,
    customer_id,
    CAST(order_date AS DATE) as order_date,
    ROUND(total_amount, 2) as total_amount,
    EXTRACT(YEAR FROM order_date) as order_year,
    EXTRACT(MONTH FROM order_date) as order_month,
    EXTRACT(QUARTER FROM order_date) as order_quarter
FROM {{ ref('stg_orders') }}
WHERE order_date IS NOT NULL
  AND total_amount > 0

{% if is_incremental() %}
  -- Solo procesar nuevos registros
  AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

# models/marts/fct_orders.sql
-- Paso 3: Crear tabla final para analytics
{{
  config(
    materialized='table'
  )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.order_date,
    o.order_year,
    o.order_month,
    o.total_amount,
    CASE
        WHEN o.total_amount < 100 THEN 'Bronze'
        WHEN o.total_amount < 500 THEN 'Silver'
        WHEN o.total_amount < 1000 THEN 'Gold'
        ELSE 'Platinum'
    END as order_tier
FROM {{ ref('int_orders_cleaned') }} o
LEFT JOIN {{ ref('dim_customers') }} c 
    ON o.customer_id = c.customer_id

Cuándo usar cada uno:

ETL:

ELT:


Receta 5.2: Apache Airflow - Orquestación de Pipelines

¿Qué es Airflow? Plataforma para crear, programar y monitorear workflows de datos como código (DAGs).

Conceptos clave:

DAG básico:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd

# Default arguments para todas las tasks
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1)
}

# Definir DAG
dag = DAG(
    'daily_sales_etl',
    default_args=default_args,
    description='Daily sales ETL pipeline',
    schedule_interval='0 2 * * *',  # Todos los días a las 2am
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'etl', 'daily']
)

# Task 1: Verificar que archivo existe
wait_for_file = FileSensor(
    task_id='wait_for_sales_file',
    filepath='/data/sales/{{ ds }}.csv',  # {{ ds }} = execution date
    poke_interval=60,  # Check cada 60 segundos
    timeout=3600,  # Timeout después de 1 hora
    mode='poke',
    dag=dag
)

# Task 2: Extraer datos
def extract_sales(**context):
    """Extraer datos del archivo"""
    execution_date = context['ds']
    
    df = pd.read_csv(f'/data/sales/{execution_date}.csv')
    
    # Guardar en XCom para siguiente task
    context['task_instance'].xcom_push(
        key='row_count',
        value=len(df)
    )
    
    print(f"✅ Extracted {len(df)} rows")
    return df.to_json()

extract_task = PythonOperator(
    task_id='extract_sales_data',
    python_callable=extract_sales,
    provide_context=True,
    dag=dag
)

# Task 3: Transformar datos
def transform_sales(**context):
    """Transformar datos"""
    # Obtener datos de task anterior via XCom
    raw_data = context['task_instance'].xcom_pull(
        task_ids='extract_sales_data'
    )
    
    df = pd.read_json(raw_data)
    
    # Transformaciones
    df['sale_date'] = pd.to_datetime(df['sale_date'])
    df['revenue'] = df['quantity'] * df['price']
    df = df.dropna()
    
    print(f"✅ Transformed {len(df)} rows")
    return df.to_json()

transform_task = PythonOperator(
    task_id='transform_sales_data',
    python_callable=transform_sales,
    provide_context=True,
    dag=dag
)

# Task 4: Cargar a database
def load_to_database(**context):
    """Cargar datos a database"""
    from sqlalchemy import create_engine
    
    transformed_data = context['task_instance'].xcom_pull(
        task_ids='transform_sales_data'
    )
    
    df = pd.read_json(transformed_data)
    
    engine = create_engine('postgresql://user:pass@host:5432/db')
    df.to_sql(
        'sales_fact',
        engine,
        if_exists='append',
        index=False
    )
    
    print(f"✅ Loaded {len(df)} rows to database")

load_task = PythonOperator(
    task_id='load_to_database',
    python_callable=load_to_database,
    provide_context=True,
    dag=dag
)

# Task 5: Generar reporte
generate_report = BashOperator(
    task_id='generate_daily_report',
    bash_command="""
        python /scripts/generate_report.py \
            --date {{ ds }} \
            --output /reports/sales_{{ ds }}.pdf
    """,
    dag=dag
)

# Task 6: Cleanup
cleanup = BashOperator(
    task_id='cleanup_temp_files',
    bash_command='rm -f /tmp/sales_{{ ds }}*',
    dag=dag
)

# Definir dependencias (orden de ejecución)
wait_for_file >> extract_task >> transform_task >> load_task >> generate_report >> cleanup

DAG con branching (conditional execution):

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator

def check_data_quality(**context):
    """Decidir siguiente paso basado en calidad de datos"""
    row_count = context['task_instance'].xcom_pull(
        task_ids='extract_sales_data',
        key='row_count'
    )
    
    if row_count > 1000:
        return 'process_large_dataset'
    elif row_count > 0:
        return 'process_small_dataset'
    else:
        return 'skip_processing'

branch_task = BranchPythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    provide_context=True,
    dag=dag
)

process_large = PythonOperator(
    task_id='process_large_dataset',
    python_callable=lambda: print("Processing large dataset with Spark"),
    dag=dag
)

process_small = PythonOperator(
    task_id='process_small_dataset',
    python_callable=lambda: print("Processing small dataset with Pandas"),
    dag=dag
)

skip = DummyOperator(
    task_id='skip_processing',
    dag=dag
)

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed_min_one_success',  # Continuar si al menos uno succeeded
    dag=dag
)

# Dependencias
extract_task >> branch_task >> [process_large, process_small, skip] >> join

TaskGroups (organizar tasks relacionadas):

from airflow.utils.task_group import TaskGroup

with DAG('complex_pipeline', ...) as dag:
    
    start = DummyOperator(task_id='start')
    
    # TaskGroup para procesamiento de customers
    with TaskGroup('process_customers', tooltip='Customer data processing') as customer_group:
        extract_customers = PythonOperator(...)
        transform_customers = PythonOperator(...)
        load_customers = PythonOperator(...)
        
        extract_customers >> transform_customers >> load_customers
    
    # TaskGroup para procesamiento de orders
    with TaskGroup('process_orders', tooltip='Order data processing') as order_group:
        extract_orders = PythonOperator(...)
        transform_orders = PythonOperator(...)
        load_orders = PythonOperator(...)
        
        extract_orders >> transform_orders >> load_orders
    
    # TaskGroup para analytics
    with TaskGroup('analytics', tooltip='Generate analytics') as analytics_group:
        calculate_metrics = PythonOperator(...)
        generate_dashboard = PythonOperator(...)
        send_notifications = PythonOperator(...)
        
        calculate_metrics >> [generate_dashboard, send_notifications]
    
    end = DummyOperator(task_id='end')
    
    # Dependencias entre grupos
    start >> [customer_group, order_group] >> analytics_group >> end

Dynamic Task Generation:

from airflow.models import Variable

def create_processing_tasks():
    """Crear tasks dinámicamente basado en configuración"""
    
    # Leer configuración (puede venir de Variable, database, etc.)
    sources = Variable.get('data_sources', deserialize_json=True)
    
    # sources = [
    #     {'name': 'salesforce', 'table': 'accounts'},
    #     {'name': 'hubspot', 'table': 'contacts'},
    #     {'name': 'stripe', 'table': 'payments'}
    # ]
    
    tasks = []
    
    for source in sources:
        task = PythonOperator(
            task_id=f"process_{source['name']}_{source['table']}",
            python_callable=process_source,
            op_kwargs={
                'source_name': source['name'],
                'table_name': source['table']
            },
            dag=dag
        )
        tasks.append(task)
    
    return tasks

def process_source(source_name, table_name):
    """Procesar un source específico"""
    print(f"Processing {source_name}.{table_name}")
    # Lógica de procesamiento

# Crear tasks dinámicamente
start = DummyOperator(task_id='start', dag=dag)
processing_tasks = create_processing_tasks()
end = DummyOperator(task_id='end', dag=dag)

# Dependencias
start >> processing_tasks >> end

Monitoring y Alerting:

from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email

def custom_failure_alert(context):
    """Callback personalizado cuando falla una task"""
    task_instance = context['task_instance']
    exception = context.get('exception')
    
    subject = f"❌ Airflow Task Failed: {task_instance.task_id}"
    
    html_content = f"""
    <h3>Task Failed</h3>
    <ul>
        <li><b>DAG:</b> {task_instance.dag_id}</li>
        <li><b>Task:</b> {task_instance.task_id}</li>
        <li><b>Execution Date:</b> {context['execution_date']}</li>
        <li><b>Error:</b> {exception}</li>
        <li><b>Log URL:</b> {task_instance.log_url}</li>
    </ul>
    """
    
    send_email(
        to=['data-team@example.com'],
        subject=subject,
        html_content=html_content
    )

def data_quality_check(**context):
    """Verificar calidad de datos"""
    # Verificaciones
    checks = {
        'null_check': check_nulls(),
        'duplicate_check': check_duplicates(),
        'schema_check': check_schema()
    }
    
    failed_checks = [k for k, v in checks.items() if not v]
    
    if failed_checks:
        raise ValueError(f"Data quality checks failed: {failed_checks}")
    
    print("✅ All data quality checks passed")

quality_check = PythonOperator(
    task_id='data_quality_check',
    python_callable=data_quality_check,
    on_failure_callback=custom_failure_alert,
    dag=dag
)

Comandos Airflow útiles:

# Inicializar database
airflow db init

# Crear usuario admin
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Listar DAGs
airflow dags list

# Test DAG (syntax check)
airflow dags test daily_sales_etl 2024-01-01

# Test task específica
airflow tasks test daily_sales_etl extract_sales_data 2024-01-01

# Trigger DAG manualmente
airflow dags trigger daily_sales_etl

# Ver estado de DAG runs
airflow dags list-runs -d daily_sales_etl

# Pausar/Despausar DAG
airflow dags pause daily_sales_etl
airflow dags unpause daily_sales_etl

# Ver logs de task
airflow tasks logs daily_sales_etl extract_sales_data 2024-01-01

# Limpiar DAG runs antiguos
airflow dags delete daily_sales_etl

Receta 5.3: Apache Spark - Procesamiento Distribuido

¿Qué es Spark? Motor de procesamiento distribuido para big data, mucho más rápido que MapReduce.

Conceptos clave:

Spark con PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# ===== INICIALIZAR SPARK =====
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# ===== LEER DATOS =====

# CSV
df_sales = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://bucket/sales/*.csv")

# Parquet (formato columnar optimizado)
df_customers = spark.read.parquet("s3://bucket/customers/")

# JSON
df_events = spark.read.json("s3://bucket/events/*.json")

# Database
df_products = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "products") \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

# ===== TRANSFORMACIONES BÁSICAS =====

# Select columnas
df_selected = df_sales.select(
    "order_id",
    "customer_id",
    "total_amount",
    "order_date"
)

# Filter
df_filtered = df_sales.filter(
    (col("total_amount") > 100) & 
    (col("order_date") >= "2024-01-01")
)

# Agregar columnas
df_enriched = df_sales.withColumn(
    "order_year",
    year(col("order_date"))
).withColumn(
    "revenue_tier",
    when(col("total_amount") < 100, "Low")
    .when(col("total_amount") < 500, "Medium")
    .otherwise("High")
)

# Renombrar columnas
df_renamed = df_sales.withColumnRenamed("total_amount", "revenue")

# Drop columnas
df_dropped = df_sales.drop("unnecessary_column")

# ===== AGREGACIONES =====

# Group by y aggregate
df_summary = df_sales.groupBy("customer_id") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("total_amount").alias("total_revenue"),
        avg("total_amount").alias("avg_order_value"),
        max("order_date").alias("last_order_date")
    )

# Multiple group by
df_monthly = df_sales.groupBy(
    year("order_date").alias("year"),
    month("order_date").alias("month")
).agg(
    sum("total_amount").alias("monthly_revenue"),
    count("order_id").alias("order_count")
).orderBy("year", "month")

# ===== JOINS =====

# Inner join
df_orders_with_customers = df_sales.join(
    df_customers,
    df_sales.customer_id == df_customers.id,
    "inner"
)

# Left join
df_all_orders = df_sales.join(
    df_customers,
    df_sales.customer_id == df_customers.id,
    "left"
)

# Broadcast join (para tablas pequeñas)
from pyspark.sql.functions import broadcast

df_with_products = df_sales.join(
    broadcast(df_products),  # df_products es pequeño
    df_sales.product_id == df_products.id
)

# ===== WINDOW FUNCTIONS =====

# Ranking
window_spec = Window.partitionBy("customer_id").orderBy(desc("order_date"))

df_ranked = df_sales.withColumn(
    "order_rank",
    row_number().over(window_spec)
).withColumn(
    "revenue_rank",
    rank().over(Window.partitionBy("customer_id").orderBy(desc("total_amount")))
)

# Running total
window_running = Window.partitionBy("customer_id") \
    .orderBy("order_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_cumulative = df_sales.withColumn(
    "cumulative_revenue",
    sum("total_amount").over(window_running)
)

# Moving average
window_moving = Window.partitionBy("customer_id") \
    .orderBy("order_date") \
    .rowsBetween(-6, 0)  # Últimos 7 registros

df_moving_avg = df_sales.withColumn(
    "moving_avg_7",
    avg("total_amount").over(window_moving)
)

# ===== LIMPIEZA DE DATOS =====

# Remover nulls
df_clean = df_sales.dropna(subset=["customer_id", "order_date"])

# Rellenar nulls
df_filled = df_sales.fillna({
    "discount": 0,
    "notes": "No notes",
    "category": "Uncategorized"
})

# Remover duplicados
df_deduped = df_sales.dropDuplicates(["order_id"])

# ===== UDFs (User Defined Functions) =====

from pyspark.sql.types import StringType

def categorize_customer(total_revenue):
    """Categorizar cliente por revenue"""
    if total_revenue > 10000:
        return "VIP"
    elif total_revenue > 5000:
        return "Premium"
    elif total_revenue > 1000:
        return "Standard"
    else:
        return "Basic"

# Registrar UDF
categorize_udf = udf(categorize_customer, StringType())

# Usar UDF
df_categorized = df_summary.withColumn(
    "customer_category",
    categorize_udf(col("total_revenue"))
)

# ===== OPTIMIZACIONES =====

# 1. Cache para reutilizar DataFrame
df_sales.cache()
df_sales.count()  # Trigger cache

# 2. Repartition para balancear carga
df_repartitioned = df_sales.repartition(200, "customer_id")

# 3. Coalesce para reducir partitions (después de filter)
df_small = df_filtered.coalesce(10)

# 4. Broadcast hint para joins
df_optimized = df_sales.join(
    broadcast(df_products),
    "product_id"
)

# ===== ESCRIBIR RESULTADOS =====

# Parquet (recomendado para analytics)
df_summary.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3://bucket/output/summary/")

# CSV
df_summary.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3://bucket/output/summary.csv")

# Database
df_summary.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "customer_summary") \
    .option("user", "user") \
    .option("password", "pass") \
    .mode("overwrite") \
    .save()

# Delta Lake (versionado, ACID)
df_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3://bucket/delta/summary")

# ===== CASO DE USO COMPLETO =====

class SparkETLPipeline:
    """Pipeline ETL con Spark"""
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
    
    def extract_sales_data(self, date_from: str, date_to: str):
        """Extraer datos de ventas"""
        df = self.spark.read.parquet("s3://bucket/sales/") \
            .filter(
                (col("order_date") >= date_from) &
                (col("order_date") <= date_to)
            )
        
        print(f"📥 Extracted {df.count()} sales records")
        return df
    
    def enrich_with_customer_data(self, df_sales):
        """Enriquecer con datos de clientes"""
        df_customers = self.spark.read.parquet("s3://bucket/customers/")
        
        df_enriched = df_sales.join(
            df_customers,
            df_sales.customer_id == df_customers.id,
            "left"
        ).select(
            df_sales["*"],
            df_customers["customer_name"],
            df_customers["customer_segment"],
            df_customers["customer_lifetime_value"]
        )
        
        return df_enriched
    
    def calculate_metrics(self, df):
        """Calcular métricas de negocio"""
        
        # Customer-level metrics
        df_customer_metrics = df.groupBy("customer_id", "customer_name") \
            .agg(
                count("order_id").alias("total_orders"),
                sum("total_amount").alias("total_revenue"),
                avg("total_amount").alias("avg_order_value"),
                max("order_date").alias("last_purchase_date"),
                min("order_date").alias("first_purchase_date")
            )
        
        # Add derived metrics
        df_customer_metrics = df_customer_metrics.withColumn(
            "customer_tenure_days",
            datediff(col("last_purchase_date"), col("first_purchase_date"))
        ).withColumn(
            "avg_days_between_orders",
            col("customer_tenure_days") / col("total_orders")
        )
        
        # Product-level metrics
        df_product_metrics = df.groupBy("product_id", "product_name") \
            .agg(
                count("order_id").alias("times_sold"),
                sum("quantity").alias("total_quantity"),
                sum("total_amount").alias("total_revenue")
            ) \
            .orderBy(desc("total_revenue"))
        
        return df_customer_metrics, df_product_metrics
    
    def run(self, date_from: str, date_to: str):
        """Ejecutar pipeline completo"""
        print(f"\n{'='*60}")
        print(f"🚀 Starting Spark ETL Pipeline")
        print(f"{'='*60}\n")
        
        # Extract
        df_sales = self.extract_sales_data(date_from, date_to)
        
        # Transform
        df_enriched = self.enrich_with_customer_data(df_sales)
        df_customer_metrics, df_product_metrics = self.calculate_metrics(df_enriched)
        
        # Load
        df_customer_metrics.write \
            .mode("overwrite") \
            .parquet("s3://bucket/metrics/customer_metrics/")
        
        df_product_metrics.write \
            .mode("overwrite") \
            .parquet("s3://bucket/metrics/product_metrics/")
        
        print(f"\n{'='*60}")
        print(f"✅ Pipeline completed successfully")
        print(f"{'='*60}\n")

# Uso
pipeline = SparkETLPipeline(spark)
pipeline.run("2024-01-01", "2024-12-31")

Spark Streaming (procesamiento en tiempo real):

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("StreamProcessing") \
    .getOrCreate()

# Leer stream desde Kafka
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sales-events") \
    .load()

# Parse JSON
schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("timestamp", TimestampType())
])

df_parsed = df_stream.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Windowed aggregation (últimos 5 minutos)
df_windowed = df_parsed \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        window("timestamp", "5 minutes", "1 minute"),
        "customer_id"
    ) \
    .agg(
        count("order_id").alias("order_count"),
        sum("amount").alias("total_amount")
    )

# Escribir stream
query = df_windowed.writeStream \
    .outputMode("update") \
    .format("parquet") \
    .option("path", "s3://bucket/streaming-output/") \
    .option("checkpointLocation", "s3://bucket/checkpoints/") \
    .start()

query.awaitTermination()

Receta 5.4: Data Warehousing vs Data Lakes

¿Qué son?

Data Warehouse:

Data Lake:

Comparación:

CaracterísticaData WarehouseData Lake
DatosEstructuradosTodos los tipos
SchemaSchema-on-writeSchema-on-read
UsuariosAnalistas de negocioData scientists, engineers
ProcesamientoSQLSQL, Python, Spark
Costo💰💰💰 Alto💰 Bajo
Performance🟢 Rápido🟡 Depende
Flexibilidad🔴 Rígido🟢 Muy flexible
Casos de usoBI, reportingML, exploración, archives

Arquitectura Data Lakehouse (lo mejor de ambos):

┌─────────────────────────────────────────────────────────────┐
│                    Data Sources                              │
├─────────────────────────────────────────────────────────────┤
│  Databases │ APIs │ SaaS │ Logs │ IoT │ Files              │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                Raw Layer (Data Lake)                         │
├─────────────────────────────────────────────────────────────┤
│  S3/Azure Data Lake/GCS                                     │
│  • Formato: JSON, CSV, Avro, Parquet                        │
│  • Sin transformación                                        │
│  • Particionado por fecha                                    │
└──────────────────────┬──────────────────────────────────────┘

                       ▼ (ETL/ELT)
┌─────────────────────────────────────────────────────────────┐
│              Processed Layer (Bronze → Silver)               │
├─────────────────────────────────────────────────────────────┤
│  Delta Lake / Apache Iceberg                                │
│  • Formato: Parquet con metadata                            │
│  • Limpiado y validado                                       │
│  • ACID transactions                                         │
└──────────────────────┬──────────────────────────────────────┘

                       ▼ (Transformations)
┌─────────────────────────────────────────────────────────────┐
│              Curated Layer (Gold)                            │
├─────────────────────────────────────────────────────────────┤
│  • Dimensional modeling (star schema)                       │
│  • Aggregations y métricas                                   │
│  • Optimizado para queries                                   │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                 Consumption Layer                            │
├─────────────────────────────────────────────────────────────┤
│  BI Tools │ ML Models │ Apps │ APIs                         │
└─────────────────────────────────────────────────────────────┘

Medallion Architecture (Bronze → Silver → Gold):

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# ===== BRONZE LAYER: Datos crudos =====

def ingest_to_bronze(source_path: str, bronze_path: str):
    """Cargar datos crudos sin transformación"""
    
    df_raw = spark.read.json(source_path)
    
    # Agregar metadata de ingesta
    df_bronze = df_raw \
        .withColumn("_ingested_at", current_timestamp()) \
        .withColumn("_source_file", input_file_name())
    
    # Escribir a Bronze (append only)
    df_bronze.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("_ingested_date") \
        .save(bronze_path)
    
    print(f"✅ Ingested {df_bronze.count()} records to Bronze")

# ===== SILVER LAYER: Datos limpios =====

def bronze_to_silver(bronze_path: str, silver_path: str):
    """Limpiar y validar datos"""
    
    df_bronze = spark.read.format("delta").load(bronze_path)
    
    # Transformaciones
    df_silver = df_bronze \
        .dropna(subset=["order_id", "customer_id"]) \
        .dropDuplicates(["order_id"]) \
        .withColumn("order_date", to_date("order_date")) \
        .withColumn("total_amount", col("total_amount").cast("decimal(10,2)")) \
        .filter(col("total_amount") > 0) \
        .select(
            "order_id",
            "customer_id",
            "order_date",
            "total_amount",
            "status",
            "_ingested_at"
        )
    
    # Merge incremental (upsert)
    if DeltaTable.isDeltaTable(spark, silver_path):
        delta_silver = DeltaTable.forPath(spark, silver_path)
        
        delta_silver.alias("target").merge(
            df_silver.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        df_silver.write \
            .format("delta") \
            .mode("overwrite") \
            .save(silver_path)
    
    print(f"✅ Processed {df_silver.count()} records to Silver")

# ===== GOLD LAYER: Datos agregados para analytics =====

def silver_to_gold(silver_path: str, gold_path: str):
    """Crear tablas agregadas para BI"""
    
    df_silver = spark.read.format("delta").load(silver_path)
    
    # Métrica: Revenue por día
    df_daily_revenue = df_silver \
        .groupBy("order_date") \
        .agg(
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_revenue"),
            avg("total_amount").alias("avg_order_value")
        ) \
        .orderBy("order_date")
    
    df_daily_revenue.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{gold_path}/daily_revenue")
    
    # Métrica: Customer lifetime value
    window_spec = Window.partitionBy("customer_id").orderBy("order_date")
    
    df_customer_ltv = df_silver \
        .withColumn("cumulative_revenue", sum("total_amount").over(window_spec)) \
        .groupBy("customer_id") \
        .agg(
            count("order_id").alias("total_orders"),
            max("cumulative_revenue").alias("lifetime_value"),
            max("order_date").alias("last_order_date")
        )
    
    df_customer_ltv.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{gold_path}/customer_ltv")
    
    print("✅ Created Gold layer tables")

# Pipeline completo
def run_medallion_pipeline():
    """Ejecutar pipeline completo Bronze → Silver → Gold"""
    
    # Paths
    source_path = "s3://raw-data/orders/*.json"
    bronze_path = "s3://datalake/bronze/orders"
    silver_path = "s3://datalake/silver/orders"
    gold_path = "s3://datalake/gold"
    
    # Execute layers
    ingest_to_bronze(source_path, bronze_path)
    bronze_to_silver(bronze_path, silver_path)
    silver_to_gold(silver_path, gold_path)
    
    print("\n✅ Medallion pipeline completed")

run_medallion_pipeline()

Time Travel con Delta Lake:

# Leer versión específica
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path")

# Leer snapshot en timestamp
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15") \
    .load("path")

# Ver historia de cambios
delta_table = DeltaTable.forPath(spark, "path")
delta_table.history().show()

# Rollback a versión anterior
delta_table.restoreToVersion(5)

Receta 5.5: Stream Processing con Kafka

¿Qué es Kafka? Plataforma distribuida de streaming para publicar, suscribir y procesar flujos de datos en tiempo real.

Conceptos clave:

Producer (Python):

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
from datetime import datetime
import time

class SalesEventProducer:
    """Producer de eventos de ventas"""
    
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Esperar confirmación de todos los replicas
            retries=3,
            max_in_flight_requests_per_connection=1  # Garantizar orden
        )
        self.topic = 'sales-events'
    
    def send_event(self, event: dict):
        """Enviar evento de venta"""
        
        # Key para partitioning (todos los eventos del mismo customer van a misma partition)
        key = event.get('customer_id')
        
        # Agregar timestamp
        event['timestamp'] = datetime.now().isoformat()
        
        # Send async
        future = self.producer.send(
            self.topic,
            key=key,
            value=event
        )
        
        # Callback para manejar success/error
        try:
            record_metadata = future.get(timeout=10)
            print(f"✅ Sent to partition {record_metadata.partition}, offset {record_metadata.offset}")
        except KafkaError as e:
            print(f"❌ Failed to send: {e}")
    
    def send_batch(self, events: list):
        """Enviar múltiples eventos"""
        for event in events:
            self.send_event(event)
        
        # Flush para asegurar que todos se enviaron
        self.producer.flush()
    
    def close(self):
        """Cerrar producer"""
        self.producer.close()

# Uso
producer = SalesEventProducer()

# Simular eventos
for i in range(100):
    event = {
        'order_id': f'order_{i}',
        'customer_id': f'customer_{i % 10}',  # 10 customers
        'product_id': f'product_{i % 5}',
        'quantity': 1,
        'amount': 99.99
    }
    producer.send_event(event)
    time.sleep(0.1)

producer.close()

Consumer (Python):

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

class SalesEventConsumer:
    """Consumer de eventos de ventas"""
    
    def __init__(
        self,
        bootstrap_servers=['localhost:9092'],
        group_id='sales-processor'
    ):
        self.consumer = KafkaConsumer(
            'sales-events',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',  # 'earliest' o 'latest'
            enable_auto_commit=False,  # Manual commit para control
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            max_poll_records=100  # Batch size
        )
    
    def process_events(self):
        """Procesar eventos continuamente"""
        
        try:
            for message in self.consumer:
                # Obtener evento
                event = message.value
                
                print(f"\n📨 Received from partition {message.partition}, offset {message.offset}")
                print(f"Event: {event}")
                
                # Procesar evento
                try:
                    self.process_single_event(event)
                    
                    # Commit offset después de procesar exitosamente
                    self.consumer.commit()
                    
                except Exception as e:
                    print(f"❌ Error processing event: {e}")
                    # No hacer commit, re-procesar en siguiente poll
        
        except KeyboardInterrupt:
            print("\n🛑 Stopping consumer...")
        finally:
            self.consumer.close()
    
    def process_single_event(self, event: dict):
        """Procesar un evento individual"""
        
        # Validar evento
        if not event.get('order_id'):
            raise ValueError("Missing order_id")
        
        # Business logic
        print(f"Processing order {event['order_id']} for customer {event['customer_id']}")
        
        # Guardar en database, enviar notificación, etc.
        # save_to_database(event)
        # send_notification(event)
    
    def process_batch(self):
        """Procesar eventos en batch"""
        
        batch = []
        batch_size = 100
        
        try:
            for message in self.consumer:
                batch.append(message.value)
                
                if len(batch) >= batch_size:
                    # Procesar batch
                    self.process_batch_events(batch)
                    
                    # Commit
                    self.consumer.commit()
                    
                    batch = []
        
        except KeyboardInterrupt:
            # Procesar batch restante
            if batch:
                self.process_batch_events(batch)
                self.consumer.commit()
        finally:
            self.consumer.close()
    
    def process_batch_events(self, events: list):
        """Procesar batch de eventos"""
        print(f"📦 Processing batch of {len(events)} events")
        
        # Bulk insert a database, etc.
        # bulk_insert_to_db(events)

# Uso
consumer = SalesEventConsumer()
consumer.process_events()

Stream Processing con Kafka Streams (conceptual en Python):

from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta

class StreamProcessor:
    """Procesamiento de streams con agregaciones"""
    
    def __init__(self):
        self.consumer = KafkaConsumer(
            'sales-events',
            bootstrap_servers=['localhost:9092'],
            group_id='stream-processor',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # State store (en producción: usar RocksDB o similar)
        self.state = defaultdict(lambda: {
            'total_amount': 0,
            'count': 0,
            'last_update': None
        })
    
    def process_stream(self):
        """Procesar stream con agregaciones en ventana de tiempo"""
        
        window_duration = timedelta(minutes=5)
        
        for message in self.consumer:
            event = message.value
            customer_id = event['customer_id']
            
            # Actualizar state
            self.state[customer_id]['total_amount'] += event['amount']
            self.state[customer_id]['count'] += 1
            self.state[customer_id]['last_update'] = datetime.now()
            
            # Verificar si ventana expiró
            current_time = datetime.now()
            if self.state[customer_id]['last_update'] < current_time - window_duration:
                # Emitir resultado agregado
                result = {
                    'customer_id': customer_id,
                    'window_start': (current_time - window_duration).isoformat(),
                    'window_end': current_time.isoformat(),
                    'total_amount': self.state[customer_id]['total_amount'],
                    'order_count': self.state[customer_id]['count'],
                    'avg_order_value': self.state[customer_id]['total_amount'] / self.state[customer_id]['count']
                }
                
                # Enviar a topic de resultados
                self.producer.send('customer-metrics', value=result)
                
                # Reset state
                self.state[customer_id] = {
                    'total_amount': 0,
                    'count': 0,
                    'last_update': current_time
                }
            
            # Commit
            self.consumer.commit()

# Uso
processor = StreamProcessor()
processor.process_stream()

Kafka con Apache Flink (procesamiento avanzado):

# Ejemplo conceptual (Flink usa Java/Scala principalmente)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor

# Setup environment
env = StreamExecutionEnvironment.get_execution_environment()

# Kafka source
kafka_props = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'flink-consumer'
}

kafka_source = FlinkKafkaConsumer(
    topics='sales-events',
    deserialization_schema=SimpleStringSchema(),
    properties=kafka_props
)

# Stream pipeline
stream = env.add_source(kafka_source)

# Parse JSON y key by customer
parsed_stream = stream.map(lambda x: json.loads(x)) \
    .key_by(lambda x: x['customer_id'])

# Windowed aggregation
result_stream = parsed_stream \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .aggregate(CustomerRevenueAggregator())

# Sink to output
result_stream.add_sink(kafka_sink)

# Execute
env.execute("Sales Stream Processing")

SQL y Python para Data Engineering

Receta 5.6: SQL Avanzado - Window Functions y CTEs

Window Functions para Analytics:

-- ===== RANKING =====

-- ROW_NUMBER: Numeración única sin empates
SELECT
    customer_id,
    order_date,
    total_amount,
    ROW_NUMBER() OVER (
        PARTITION BY customer_id 
        ORDER BY order_date DESC
    ) as order_number
FROM orders;

-- RANK: Con empates (gaps en numeración)
SELECT
    product_name,
    category,
    revenue,
    RANK() OVER (
        PARTITION BY category 
        ORDER BY revenue DESC
    ) as revenue_rank
FROM products;

-- DENSE_RANK: Con empates (sin gaps)
SELECT
    salesperson,
    sales_amount,
    DENSE_RANK() OVER (
        ORDER BY sales_amount DESC
    ) as rank
FROM sales;

-- ===== DISTRIBUCIÓN =====

-- NTILE: Dividir en N grupos
SELECT
    customer_id,
    total_revenue,
    NTILE(4) OVER (
        ORDER BY total_revenue DESC
    ) as quartile
FROM customer_summary;

-- PERCENT_RANK: Percentil relativo
SELECT
    product_id,
    price,
    PERCENT_RANK() OVER (
        ORDER BY price
    ) as price_percentile
FROM products;

-- ===== AGREGACIONES CON VENTANAS =====

-- Running total
SELECT
    order_date,
    daily_revenue,
    SUM(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_revenue
FROM daily_sales;

-- Moving average (últimos 7 días)
SELECT
    sale_date,
    revenue,
    AVG(revenue) OVER (
        ORDER BY sale_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7day
FROM daily_revenue;

-- Year-over-Year comparison
SELECT
    sale_date,
    revenue,
    LAG(revenue, 365) OVER (ORDER BY sale_date) as revenue_last_year,
    revenue - LAG(revenue, 365) OVER (ORDER BY sale_date) as yoy_change,
    ROUND(
        (revenue - LAG(revenue, 365) OVER (ORDER BY sale_date)) / 
        LAG(revenue, 365) OVER (ORDER BY sale_date) * 100,
        2
    ) as yoy_change_pct
FROM daily_revenue;

-- ===== LEAD y LAG =====

-- Siguiente y anterior valor
SELECT
    customer_id,
    order_date,
    total_amount,
    LAG(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as previous_order_date,
    LEAD(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as next_order_date,
    order_date - LAG(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as days_since_last_order
FROM orders;

-- ===== FIRST_VALUE y LAST_VALUE =====

-- Comparar con primer y último valor de la ventana
SELECT
    product_id,
    sale_date,
    price,
    FIRST_VALUE(price) OVER (
        PARTITION BY product_id 
        ORDER BY sale_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as initial_price,
    LAST_VALUE(price) OVER (
        PARTITION BY product_id 
        ORDER BY sale_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as current_price
FROM product_prices;

CTEs (Common Table Expressions):

-- ===== CTE SIMPLE =====

-- Calcular métricas intermedias
WITH customer_metrics AS (
    SELECT
        customer_id,
        COUNT(*) as total_orders,
        SUM(total_amount) as total_revenue,
        AVG(total_amount) as avg_order_value,
        MAX(order_date) as last_order_date
    FROM orders
    GROUP BY customer_id
)
SELECT
    c.customer_name,
    c.email,
    cm.total_orders,
    cm.total_revenue,
    cm.avg_order_value,
    CURRENT_DATE - cm.last_order_date as days_since_last_order
FROM customers c
JOIN customer_metrics cm ON c.id = cm.customer_id
WHERE cm.total_revenue > 1000
ORDER BY cm.total_revenue DESC;

-- ===== MÚLTIPLES CTEs =====

WITH 
-- CTE 1: Ventas mensuales
monthly_sales AS (
    SELECT
        DATE_TRUNC('month', order_date) as month,
        SUM(total_amount) as revenue,
        COUNT(*) as order_count
    FROM orders
    GROUP BY DATE_TRUNC('month', order_date)
),
-- CTE 2: Agregar métricas derivadas
sales_with_metrics AS (
    SELECT
        month,
        revenue,
        order_count,
        revenue / order_count as avg_order_value,
        LAG(revenue) OVER (ORDER BY month) as prev_month_revenue
    FROM monthly_sales
),
-- CTE 3: Calcular growth
sales_with_growth AS (
    SELECT
        month,
        revenue,
        prev_month_revenue,
        CASE 
            WHEN prev_month_revenue IS NOT NULL THEN
                ROUND((revenue - prev_month_revenue) / prev_month_revenue * 100, 2)
            ELSE NULL
        END as mom_growth_pct
    FROM sales_with_metrics
)
SELECT * 
FROM sales_with_growth
WHERE mom_growth_pct IS NOT NULL
ORDER BY month DESC;

-- ===== CTE RECURSIVO: Jerarquías =====

-- Organigrama de empleados
WITH RECURSIVE employee_hierarchy AS (
    -- Caso base: CEO (sin manager)
    SELECT
        id,
        name,
        manager_id,
        1 as level,
        name::TEXT as path
    FROM employees
    WHERE manager_id IS NULL
    
    UNION ALL
    
    -- Caso recursivo: empleados con managers
    SELECT
        e.id,
        e.name,
        e.manager_id,
        eh.level + 1,
        eh.path || ' > ' || e.name
    FROM employees e
    INNER JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
    id,
    name,
    level,
    path
FROM employee_hierarchy
ORDER BY level, name;

-- Series de fechas
WITH RECURSIVE date_series AS (
    SELECT DATE '2024-01-01' as date
    UNION ALL
    SELECT date + INTERVAL '1 day'
    FROM date_series
    WHERE date < DATE '2024-12-31'
)
SELECT 
    ds.date,
    COALESCE(SUM(o.total_amount), 0) as revenue
FROM date_series ds
LEFT JOIN orders o ON DATE(o.order_date) = ds.date
GROUP BY ds.date
ORDER BY ds.date;

Queries complejos para Data Engineering:

-- ===== COHORT ANALYSIS =====

-- Análisis de retención por cohorte
WITH first_purchase AS (
    SELECT
        customer_id,
        MIN(DATE_TRUNC('month', order_date)) as cohort_month
    FROM orders
    GROUP BY customer_id
),
customer_orders AS (
    SELECT
        o.customer_id,
        fp.cohort_month,
        DATE_TRUNC('month', o.order_date) as order_month,
        EXTRACT(YEAR FROM AGE(o.order_date, fp.cohort_month)) * 12 +
        EXTRACT(MONTH FROM AGE(o.order_date, fp.cohort_month)) as months_since_first
    FROM orders o
    JOIN first_purchase fp ON o.customer_id = fp.customer_id
)
SELECT
    cohort_month,
    months_since_first,
    COUNT(DISTINCT customer_id) as customers,
    COUNT(*) as orders
FROM customer_orders
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;

-- ===== FUNNEL ANALYSIS =====

-- Embudo de conversión
WITH funnel_steps AS (
    SELECT
        user_id,
        MAX(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as viewed,
        MAX(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart,
        MAX(CASE WHEN event_type = 'checkout' THEN 1 ELSE 0 END) as checked_out,
        MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchased
    FROM events
    WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY user_id
)
SELECT
    COUNT(*) as total_users,
    SUM(viewed) as step1_viewed,
    SUM(added_to_cart) as step2_add_to_cart,
    SUM(checked_out) as step3_checkout,
    SUM(purchased) as step4_purchase,
    ROUND(SUM(added_to_cart)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as conversion_to_cart,
    ROUND(SUM(purchased)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as overall_conversion
FROM funnel_steps;

-- ===== RFM ANALYSIS (Recency, Frequency, Monetary) =====

WITH customer_rfm AS (
    SELECT
        customer_id,
        -- Recency: días desde última compra
        CURRENT_DATE - MAX(order_date) as recency,
        -- Frequency: número de órdenes
        COUNT(*) as frequency,
        -- Monetary: revenue total
        SUM(total_amount) as monetary
    FROM orders
    WHERE order_date >= CURRENT_DATE - INTERVAL '1 year'
    GROUP BY customer_id
),
rfm_scores AS (
    SELECT
        customer_id,
        recency,
        frequency,
        monetary,
        -- Scores de 1-5 basados en quintiles
        NTILE(5) OVER (ORDER BY recency DESC) as r_score,  -- Menor recency = mejor
        NTILE(5) OVER (ORDER BY frequency ASC) as f_score,
        NTILE(5) OVER (ORDER BY monetary ASC) as m_score
    FROM customer_rfm
)
SELECT
    customer_id,
    CASE
        WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champions'
        WHEN r_score >= 3 AND f_score >= 3 THEN 'Loyal Customers'
        WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customers'
        WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
        WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
        ELSE 'Others'
    END as segment,
    r_score,
    f_score,
    m_score
FROM rfm_scores;

Receta 5.7: Python para Data Engineering

Pandas - Manipulación de Datos:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ===== LEER DATOS =====

# CSV
df = pd.read_csv('sales.csv', parse_dates=['order_date'])

# Excel
df = pd.read_excel('data.xlsx', sheet_name='Sales')

# JSON
df = pd.read_json('data.json')

# SQL
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@host:5432/db')
df = pd.read_sql("SELECT * FROM orders", engine)

# Parquet
df = pd.read_parquet('data.parquet')

# ===== EXPLORACIÓN =====

# Info básica
print(df.head())
print(df.info())
print(df.describe())

# Valores únicos
print(df['category'].unique())
print(df['category'].value_counts())

# Valores nulos
print(df.isnull().sum())
print(df.isnull().sum() / len(df) * 100)  # Porcentaje

# ===== LIMPIEZA =====

# Remover duplicados
df_clean = df.drop_duplicates(subset=['order_id'])

# Remover nulls
df_clean = df.dropna(subset=['customer_id', 'total_amount'])

# Rellenar nulls
df_filled = df.fillna({
    'discount': 0,
    'notes': 'No notes',
    'category': 'Uncategorized'
})

# Forward fill (usar valor anterior)
df['price'] = df['price'].fillna(method='ffill')

# Backward fill
df['price'] = df['price'].fillna(method='bfill')

# Interpolar valores numéricos
df['sales'] = df['sales'].interpolate(method='linear')

# ===== TRANSFORMACIONES =====

# Crear nuevas columnas
df['revenue'] = df['quantity'] * df['price']
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['day_of_week'] = df['order_date'].dt.day_name()

# Apply function
def categorize_amount(amount):
    if amount < 100:
        return 'Low'
    elif amount < 500:
        return 'Medium'
    else:
        return 'High'

df['amount_category'] = df['total_amount'].apply(categorize_amount)

# Apply con lambda
df['customer_name_upper'] = df['customer_name'].apply(lambda x: x.upper())

# Map con diccionario
category_map = {'A': 'Premium', 'B': 'Standard', 'C': 'Basic'}
df['tier'] = df['category'].map(category_map)

# String operations
df['email'] = df['email'].str.lower().str.strip()
df['product_name'] = df['product_name'].str.title()

# ===== AGREGACIONES =====

# Group by simple
summary = df.groupby('customer_id').agg({
    'order_id': 'count',
    'total_amount': ['sum', 'mean', 'max'],
    'order_date': 'max'
}).reset_index()

# Renombrar columnas después de agg
summary.columns = [
    'customer_id',
    'total_orders',
    'total_revenue',
    'avg_order_value',
    'max_order_value',
    'last_order_date'
]

# Group by múltiples columnas
monthly_summary = df.groupby([
    df['order_date'].dt.to_period('M'),
    'product_category'
]).agg({
    'total_amount': 'sum',
    'order_id': 'count'
}).reset_index()

# Custom aggregation function
def revenue_per_customer(group):
    return group['total_amount'].sum() / group['customer_id'].nunique()

df.groupby('product_category').apply(revenue_per_customer)

# ===== PIVOT TABLES =====

# Pivot
pivot = df.pivot_table(
    values='total_amount',
    index='customer_id',
    columns='product_category',
    aggfunc='sum',
    fill_value=0
)

# Multiple aggregations
pivot_multi = df.pivot_table(
    values='total_amount',
    index='order_date',
    columns='product_category',
    aggfunc=['sum', 'count', 'mean']
)

# ===== MERGE Y JOIN =====

# Inner join
df_merged = pd.merge(
    df_orders,
    df_customers,
    left_on='customer_id',
    right_on='id',
    how='inner'
)

# Left join
df_all_orders = pd.merge(
    df_orders,
    df_customers,
    left_on='customer_id',
    right_on='id',
    how='left'
)

# Multiple joins
df_complete = df_orders \
    .merge(df_customers, left_on='customer_id', right_on='id') \
    .merge(df_products, left_on='product_id', right_on='id')

# ===== WINDOW FUNCTIONS =====

# Ranking
df['revenue_rank'] = df.groupby('customer_id')['total_amount'].rank(
    ascending=False,
    method='dense'
)

# Cumulative sum
df['cumulative_revenue'] = df.groupby('customer_id')['total_amount'].cumsum()

# Moving average
df['moving_avg_3'] = df.groupby('customer_id')['total_amount'] \
    .transform(lambda x: x.rolling(window=3, min_periods=1).mean())

# Shift (lag/lead)
df['prev_order_amount'] = df.groupby('customer_id')['total_amount'].shift(1)
df['next_order_amount'] = df.groupby('customer_id')['total_amount'].shift(-1)

# ===== TIME SERIES =====

# Resample (agregación temporal)
df.set_index('order_date', inplace=True)

# Daily to monthly
monthly = df.resample('M').agg({
    'total_amount': 'sum',
    'order_id': 'count'
})

# Rolling window
df['7day_avg'] = df['total_amount'].rolling(window=7).mean()

# ===== CASO DE USO COMPLETO =====

class DataPipeline:
    """Pipeline completo de procesamiento con Pandas"""
    
    def __init__(self, input_path: str):
        self.input_path = input_path
        self.df = None
    
    def extract(self):
        """Extraer datos"""
        print("📥 Extracting data...")
        self.df = pd.read_csv(self.input_path, parse_dates=['order_date'])
        print(f"✅ Loaded {len(self.df)} rows")
        return self
    
    def clean(self):
        """Limpiar datos"""
        print("🧹 Cleaning data...")
        
        # Remover duplicados
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=['order_id'])
        print(f"  Removed {before - len(self.df)} duplicates")
        
        # Remover nulls en columnas críticas
        before = len(self.df)
        self.df = self.df.dropna(subset=['customer_id', 'total_amount'])
        print(f"  Removed {before - len(self.df)} rows with nulls")
        
        # Rellenar otros nulls
        self.df['discount'] = self.df['discount'].fillna(0)
        
        # Normalizar strings
        self.df['customer_email'] = self.df['customer_email'].str.lower().str.strip()
        
        # Filtrar valores inválidos
        before = len(self.df)
        self.df = self.df[self.df['total_amount'] > 0]
        print(f"  Removed {before - len(self.df)} invalid amounts")
        
        print(f"✅ Clean dataset: {len(self.df)} rows")
        return self
    
    def transform(self):
        """Transformar y enriquecer datos"""
        print("⚙️ Transforming data...")
        
        # Columnas derivadas
        self.df['revenue'] = self.df['quantity'] * self.df['price']
        self.df['order_year'] = self.df['order_date'].dt.year
        self.df['order_month'] = self.df['order_date'].dt.month
        
        # Categorización
        self.df['revenue_tier'] = pd.cut(
            self.df['total_amount'],
            bins=[0, 100, 500, 1000, float('inf')],
            labels=['Bronze', 'Silver', 'Gold', 'Platinum']
        )
        
        # Agregaciones por cliente
        customer_metrics = self.df.groupby('customer_id').agg({
            'order_id': 'count',
            'total_amount': 'sum'
        }).rename(columns={
            'order_id': 'total_orders',
            'total_amount': 'customer_ltv'
        })
        
        self.df = self.df.merge(customer_metrics, on='customer_id', how='left')
        
        print("✅ Transformation complete")
        return self
    
    def validate(self):
        """Validar calidad de datos"""
        print("✓ Validating data quality...")
        
        # Check 1: No nulls en columnas críticas
        critical_cols = ['order_id', 'customer_id', 'total_amount']
        nulls = self.df[critical_cols].isnull().sum()
        assert nulls.sum() == 0, f"Found nulls in critical columns: {nulls}"
        
        # Check 2: No duplicados
        duplicates = self.df.duplicated(subset=['order_id']).sum()
        assert duplicates == 0, f"Found {duplicates} duplicate order_ids"
        
        # Check 3: Valores positivos
        assert (self.df['total_amount'] > 0).all(), "Found negative amounts"
        
        # Check 4: Fechas válidas
        assert (self.df['order_date'] <= datetime.now()).all(), "Found future dates"
        
        print("✅ All validation checks passed")
        return self
    
    def load(self, output_path: str):
        """Guardar datos procesados"""
        print(f"💾 Loading to {output_path}...")
        
        self.df.to_parquet(
            output_path,
            index=False,
            compression='snappy',
            partition_cols=['order_year', 'order_month']
        )
        
        print(f"✅ Saved {len(self.df)} rows")
        return self
    
    def run(self, output_path: str):
        """Ejecutar pipeline completo"""
        print(f"\n{'='*60}")
        print(f"🚀 Starting Pandas ETL Pipeline")
        print(f"{'='*60}\n")
        
        self.extract() \
            .clean() \
            .transform() \
            .validate() \
            .load(output_path)
        
        print(f"\n{'='*60}")
        print(f"✅ Pipeline completed successfully")
        print(f"{'='*60}\n")

# Uso
pipeline = DataPipeline('raw_sales.csv')
pipeline.run('processed_sales.parquet')

NumPy - Operaciones Numéricas:

import numpy as np

# ===== ARRAYS =====

# Crear arrays
arr = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])

# Arrays especiales
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
identity = np.eye(3)
random = np.random.rand(3, 3)

# Range
arr = np.arange(0, 10, 2)  # [0, 2, 4, 6, 8]
arr = np.linspace(0, 1, 5)  # [0, 0.25, 0.5, 0.75, 1]

# ===== OPERACIONES =====

# Elementwise
arr = np.array([1, 2, 3])
arr * 2  # [2, 4, 6]
arr + arr  # [2, 4, 6]
arr ** 2  # [1, 4, 9]

# Agregaciones
arr.sum()
arr.mean()
arr.std()
arr.max()
arr.min()

# Axis-wise operations
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_2d.sum(axis=0)  # [5, 7, 9] - columnas
arr_2d.sum(axis=1)  # [6, 15] - filas

# ===== INDEXING Y SLICING =====

arr = np.array([1, 2, 3, 4, 5])
arr[0]  # 1
arr[-1]  # 5
arr[1:4]  # [2, 3, 4]
arr[arr > 2]  # [3, 4, 5] - boolean indexing

# ===== BROADCASTING =====

arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_1d = np.array([10, 20, 30])

result = arr_2d + arr_1d  # Broadcasting
# [[11, 22, 33],
#  [14, 25, 36]]

# ===== CASO DE USO: Procesamiento de métricas =====

def calculate_moving_average(data, window):
    """Calcular moving average eficientemente con NumPy"""
    weights = np.ones(window) / window
    return np.convolve(data, weights, mode='valid')

# Datos de ventas diarias
sales = np.random.randint(100, 500, size=365)

# Moving average de 7 días
ma_7 = calculate_moving_average(sales, 7)

# Detectar outliers
mean = sales.mean()
std = sales.std()
outliers = sales[(sales < mean - 2*std) | (sales > mean + 2*std)]

Receta 5.8: Data Quality - Validación y Limpieza

Great Expectations - Framework de validación:

import great_expectations as gx
from great_expectations.core.batch import BatchRequest
import pandas as pd

# ===== SETUP =====

# Inicializar contexto
context = gx.get_context()

# Crear expectation suite
suite = context.create_expectation_suite(
    expectation_suite_name="sales_data_quality",
    overwrite_existing=True
)

# ===== EXPECTATIVAS =====

# Tabla level
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 1,
            "max_value": 1000000
        }
    )
)

# Columna existe
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "order_id"}
    )
)

# No nulls
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "customer_id"}
    )
)

# Valores únicos
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_unique",
        kwargs={"column": "order_id"}
    )
)

# Valores en set
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column": "status",
            "value_set": ["pending", "completed", "cancelled"]
        }
    )
)

# Rango numérico
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "total_amount",
            "min_value": 0,
            "max_value": 100000
        }
    )
)

# Regex pattern
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "email",
            "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
        }
    )
)

# ===== VALIDAR DATOS =====

# Cargar datos
df = pd.read_csv("sales.csv")

# Crear batch
batch = context.sources.add_pandas("sales_source").add_dataframe_asset(
    name="sales_data"
).build_batch_request(dataframe=df)

# Validar
results = context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[batch],
    run_id="manual_run_20240101"
)

# Revisar resultados
if results["success"]:
    print("✅ All validations passed!")
else:
    print("❌ Validations failed:")
    for result in results["run_results"]:
        print(result)

Data Profiling con pandas-profiling:

from ydata_profiling import ProfileReport
import pandas as pd

# Cargar datos
df = pd.read_csv("sales.csv")

# Generar reporte
profile = ProfileReport(
    df,
    title="Sales Data Profile Report",
    explorative=True,
    minimal=False
)

# Guardar reporte HTML
profile.to_file("sales_profile_report.html")

# Ver estadísticas inline
print(profile.description_set)

# Identificar alertas automáticamente
print(profile.get_description()['alerts'])

Detección y manejo de outliers:

import pandas as pd
import numpy as np
from scipy import stats

class OutlierDetector:
    """Detectar y manejar outliers"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.outliers = {}
    
    def detect_iqr(self, column: str, threshold=1.5):
        """Método IQR (Interquartile Range)"""
        Q1 = self.df[column].quantile(0.25)
        Q3 = self.df[column].quantile(0.75)
        IQR = Q3 - Q1
        
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR
        
        outliers = self.df[
            (self.df[column] < lower_bound) | 
            (self.df[column] > upper_bound)
        ]
        
        self.outliers[column] = {
            'method': 'IQR',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def detect_zscore(self, column: str, threshold=3):
        """Método Z-score"""
        z_scores = np.abs(stats.zscore(self.df[column]))
        outliers = self.df[z_scores > threshold]
        
        self.outliers[column] = {
            'method': 'Z-score',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'threshold': threshold,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def detect_isolation_forest(self, columns: list):
        """Método Isolation Forest (ML-based)"""
        from sklearn.ensemble import IsolationForest
        
        iso_forest = IsolationForest(
            contamination=0.1,  # 10% esperado de outliers
            random_state=42
        )
        
        predictions = iso_forest.fit_predict(self.df[columns])
        outliers = self.df[predictions == -1]
        
        self.outliers['multivariate'] = {
            'method': 'Isolation Forest',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def remove_outliers(self, column: str, method='iqr'):
        """Remover outliers"""
        if method == 'iqr':
            outlier_indices = self.outliers[column]['indices']
            return self.df.drop(outlier_indices)
    
    def cap_outliers(self, column: str):
        """Cap outliers (winsorization)"""
        info = self.outliers[column]
        
        self.df[column] = self.df[column].clip(
            lower=info['lower_bound'],
            upper=info['upper_bound']
        )
        
        return self.df
    
    def summary(self):
        """Resumen de outliers detectados"""
        for col, info in self.outliers.items():
            print(f"\n=== {col} ===")
            print(f"Method: {info['method']}")
            print(f"Count: {info['count']}")
            print(f"Percentage: {info['percentage']:.2f}%")

# Uso
df = pd.read_csv("sales.csv")
detector = OutlierDetector(df)

# Detectar con diferentes métodos
detector.detect_iqr('total_amount')
detector.detect_zscore('total_amount')

# Ver resumen
detector.summary()

# Remover outliers
df_clean = detector.remove_outliers('total_amount', method='iqr')

Data Cleaning Pipeline completo:

import pandas as pd
import numpy as np
from datetime import datetime

class DataCleaner:
    """Pipeline de limpieza de datos completo"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.cleaning_log = []
    
    def log(self, message: str):
        """Log de operaciones"""
        self.cleaning_log.append({
            'timestamp': datetime.now(),
            'message': message
        })
        print(f"  {message}")
    
    def handle_missing_values(self, strategies: dict):
        """
        Manejar valores faltantes
        
        strategies = {
            'column1': 'drop',
            'column2': 'mean',
            'column3': 'mode',
            'column4': 'forward_fill',
            'column5': 0
        }
        """
        print("\n🧹 Handling missing values...")
        
        for col, strategy in strategies.items():
            if col not in self.df.columns:
                continue
            
            missing_count = self.df[col].isnull().sum()
            if missing_count == 0:
                continue
            
            if strategy == 'drop':
                before = len(self.df)
                self.df = self.df.dropna(subset=[col])
                self.log(f"Dropped {before - len(self.df)} rows with null {col}")
            
            elif strategy == 'mean':
                mean_val = self.df[col].mean()
                self.df[col].fillna(mean_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with mean ({mean_val:.2f})")
            
            elif strategy == 'median':
                median_val = self.df[col].median()
                self.df[col].fillna(median_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with median ({median_val:.2f})")
            
            elif strategy == 'mode':
                mode_val = self.df[col].mode()[0]
                self.df[col].fillna(mode_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with mode ({mode_val})")
            
            elif strategy == 'forward_fill':
                self.df[col].fillna(method='ffill', inplace=True)
                self.log(f"Forward filled {missing_count} nulls in {col}")
            
            else:
                # Valor constante
                self.df[col].fillna(strategy, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with {strategy}")
        
        return self
    
    def remove_duplicates(self, subset=None, keep='first'):
        """Remover duplicados"""
        print("\n🔍 Removing duplicates...")
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        removed = before - len(self.df)
        
        if removed > 0:
            self.log(f"Removed {removed} duplicate rows")
        else:
            self.log("No duplicates found")
        
        return self
    
    def standardize_text(self, columns: list):
        """Estandarizar columnas de texto"""
        print("\n✏️ Standardizing text...")
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            # Strip whitespace
            self.df[col] = self.df[col].str.strip()
            
            # Lowercase
            self.df[col] = self.df[col].str.lower()
            
            # Remove extra spaces
            self.df[col] = self.df[col].str.replace(r'\s+', ' ', regex=True)
            
            self.log(f"Standardized text in {col}")
        
        return self
    
    def fix_data_types(self, type_map: dict):
        """Corregir tipos de datos"""
        print("\n🔧 Fixing data types...")
        
        for col, dtype in type_map.items():
            if col not in self.df.columns:
                continue
            
            try:
                if dtype == 'datetime':
                    self.df[col] = pd.to_datetime(self.df[col])
                else:
                    self.df[col] = self.df[col].astype(dtype)
                
                self.log(f"Converted {col} to {dtype}")
            except Exception as e:
                self.log(f"Failed to convert {col} to {dtype}: {str(e)}")
        
        return self
    
    def remove_outliers(self, columns: list, method='iqr', threshold=1.5):
        """Remover outliers"""
        print(f"\n📊 Removing outliers using {method} method...")
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            before = len(self.df)
            
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                
                lower = Q1 - threshold * IQR
                upper = Q3 + threshold * IQR
                
                self.df = self.df[
                    (self.df[col] >= lower) & 
                    (self.df[col] <= upper)
                ]
            
            removed = before - len(self.df)
            if removed > 0:
                self.log(f"Removed {removed} outliers from {col}")
        
        return self
    
    def validate_ranges(self, validations: dict):
        """Validar rangos de valores"""
        print("\n✓ Validating value ranges...")
        
        for col, (min_val, max_val) in validations.items():
            if col not in self.df.columns:
                continue
            
            before = len(self.df)
            self.df = self.df[
                (self.df[col] >= min_val) & 
                (self.df[col] <= max_val)
            ]
            
            removed = before - len(self.df)
            if removed > 0:
                self.log(f"Removed {removed} rows with {col} outside [{min_val}, {max_val}]")
        
        return self
    
    def get_cleaned_data(self):
        """Retornar datos limpios"""
        return self.df
    
    def get_cleaning_report(self):
        """Generar reporte de limpieza"""
        return pd.DataFrame(self.cleaning_log)

# ===== USO =====

# Cargar datos
df = pd.read_csv("raw_sales.csv")

# Pipeline de limpieza
cleaner = DataCleaner(df)

df_clean = cleaner \
    .handle_missing_values({
        'customer_id': 'drop',
        'total_amount': 'median',
        'discount': 0,
        'category': 'mode'
    }) \
    .remove_duplicates(subset=['order_id']) \
    .standardize_text(['customer_email', 'customer_name']) \
    .fix_data_types({
        'order_date': 'datetime',
        'total_amount': 'float64',
        'quantity': 'int32'
    }) \
    .validate_ranges({
        'total_amount': (0, 100000),
        'quantity': (1, 1000)
    }) \
    .remove_outliers(['total_amount'], method='iqr') \
    .get_cleaned_data()

# Ver reporte
report = cleaner.get_cleaning_report()
print("\n" + "="*60)
print("CLEANING REPORT")
print("="*60)
print(report)

# Guardar datos limpios
df_clean.to_parquet('clean_sales.parquet', index=False)

¡Felicidades! 🎉

Has completado la FASE 5: Ingeniería de Datos del roadmap.

Lo que has aprendido:

✅ ETL vs ELT - Cuándo usar cada enfoque
✅ Apache Airflow - Orquestación de pipelines
✅ Apache Spark - Procesamiento distribuido
✅ Data Warehousing vs Data Lakes
✅ Medallion Architecture (Bronze/Silver/Gold)
✅ Stream Processing con Kafka
✅ SQL Avanzado - Window Functions y CTEs
✅ Pandas y NumPy para Data Engineering
✅ Data Quality y validación
✅ Data Cleaning pipelines

Próximos pasos:

FASE 6: Inteligencia Artificial y Colaboración con IA


Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo