📊 Developer Cookbook - FASE 5: Pipelines de Datos
Recetas prácticas para construir pipelines de datos robustos y escalables
📚 Tabla de Contenidos
- Receta 5.1: ETL vs ELT - ¿Cuál usar?
- Receta 5.2: Apache Airflow - Orquestación de Pipelines
- Receta 5.3: Apache Spark - Procesamiento Distribuido
- Receta 5.4: Data Warehousing vs Data Lakes
- Receta 5.5: Stream Processing con Kafka
Pipelines de Datos
Receta 5.1: ETL vs ELT - ¿Cuál usar?
¿Qué son?
- ETL (Extract, Transform, Load): Transformar datos ANTES de cargarlos al destino
- ELT (Extract, Load, Transform): Cargar datos crudos y transformar DESPUÉS en el destino
Comparación:
| Característica | ETL | ELT |
|---|---|---|
| Transformación | Antes de cargar | Después de cargar |
| Velocidad inicial | 🟡 Más lenta | 🟢 Más rápida |
| Flexibilidad | 🔴 Menos flexible | 🟢 Muy flexible |
| Costos compute | 🟢 Predecible | 🟡 Variable |
| Mejor para | Data warehouses tradicionales | Cloud data warehouses |
| Storage | 🟢 Menos storage | 🔴 Más storage |
| Casos de uso | Datos sensibles, compliance | Exploración, analytics |
Ejemplo ETL con Python:
import pandas as pd
from datetime import datetime
import psycopg2
from typing import Dict, List
class ETLPipeline:
"""Pipeline ETL tradicional"""
def __init__(self, source_config: Dict, target_config: Dict):
self.source_config = source_config
self.target_config = target_config
def extract(self, query: str) -> pd.DataFrame:
"""E - Extract: Extraer datos de la fuente"""
print("📥 Extracting data from source...")
# Conectar a base de datos source
conn = psycopg2.connect(**self.source_config)
# Extraer datos
df = pd.read_sql(query, conn)
conn.close()
print(f"✅ Extracted {len(df)} rows")
return df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""T - Transform: Limpiar y transformar datos"""
print("⚙️ Transforming data...")
# 1. Limpiar valores nulos
df = df.dropna(subset=['customer_id', 'order_date'])
# 2. Normalizar columnas de texto
df['customer_name'] = df['customer_name'].str.strip().str.title()
df['email'] = df['email'].str.lower()
# 3. Convertir tipos de datos
df['order_date'] = pd.to_datetime(df['order_date'])
df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
# 4. Crear columnas derivadas
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['order_quarter'] = df['order_date'].dt.quarter
# 5. Categorizar clientes
df['customer_segment'] = pd.cut(
df['total_amount'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bronze', 'Silver', 'Gold', 'Platinum']
)
# 6. Remover duplicados
df = df.drop_duplicates(subset=['order_id'])
# 7. Agregar metadata
df['etl_processed_at'] = datetime.now()
df['etl_version'] = '1.0'
print(f"✅ Transformed to {len(df)} rows")
return df
def load(self, df: pd.DataFrame, table_name: str):
"""L - Load: Cargar datos al destino"""
print(f"📤 Loading data to {table_name}...")
# Conectar a base de datos target
conn = psycopg2.connect(**self.target_config)
cursor = conn.cursor()
# Truncar tabla si existe (full refresh)
cursor.execute(f"TRUNCATE TABLE {table_name}")
# Bulk insert
from io import StringIO
buffer = StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cursor.copy_from(
buffer,
table_name,
sep=',',
null='',
columns=df.columns.tolist()
)
conn.commit()
cursor.close()
conn.close()
print(f"✅ Loaded {len(df)} rows to {table_name}")
def run(self, extract_query: str, target_table: str):
"""Ejecutar pipeline completo"""
start_time = datetime.now()
print(f"\n{'='*60}")
print(f"🚀 Starting ETL Pipeline - {start_time}")
print(f"{'='*60}\n")
try:
# Extract
raw_data = self.extract(extract_query)
# Transform
clean_data = self.transform(raw_data)
# Load
self.load(clean_data, target_table)
# Log success
duration = (datetime.now() - start_time).total_seconds()
print(f"\n{'='*60}")
print(f"✅ ETL Pipeline completed successfully in {duration:.2f}s")
print(f"{'='*60}\n")
except Exception as e:
print(f"\n❌ ETL Pipeline failed: {str(e)}")
raise
# Uso
source_config = {
'host': 'source-db.example.com',
'database': 'production',
'user': 'etl_user',
'password': 'password'
}
target_config = {
'host': 'warehouse.example.com',
'database': 'analytics',
'user': 'warehouse_user',
'password': 'password'
}
pipeline = ETLPipeline(source_config, target_config)
extract_query = """
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.email,
o.order_date,
o.total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
"""
pipeline.run(extract_query, 'dim_orders')
Ejemplo ELT con dbt (Data Build Tool):
# models/staging/stg_orders.sql
-- Paso 1: Cargar datos crudos (ya están en el warehouse)
{{
config(
materialized='view'
)
}}
SELECT
order_id,
customer_id,
order_date,
total_amount,
_loaded_at
FROM {{ source('raw', 'orders') }}
# models/intermediate/int_orders_cleaned.sql
-- Paso 2: Limpiar datos
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT
order_id,
customer_id,
CAST(order_date AS DATE) as order_date,
ROUND(total_amount, 2) as total_amount,
EXTRACT(YEAR FROM order_date) as order_year,
EXTRACT(MONTH FROM order_date) as order_month,
EXTRACT(QUARTER FROM order_date) as order_quarter
FROM {{ ref('stg_orders') }}
WHERE order_date IS NOT NULL
AND total_amount > 0
{% if is_incremental() %}
-- Solo procesar nuevos registros
AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
# models/marts/fct_orders.sql
-- Paso 3: Crear tabla final para analytics
{{
config(
materialized='table'
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_segment,
o.order_date,
o.order_year,
o.order_month,
o.total_amount,
CASE
WHEN o.total_amount < 100 THEN 'Bronze'
WHEN o.total_amount < 500 THEN 'Silver'
WHEN o.total_amount < 1000 THEN 'Gold'
ELSE 'Platinum'
END as order_tier
FROM {{ ref('int_orders_cleaned') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
Cuándo usar cada uno:
ETL:
- ✅ Datos sensibles (PII, compliance)
- ✅ Transformaciones complejas requieren mucho CPU
- ✅ Target database con recursos limitados
- ✅ Necesitas validar antes de cargar
- ✅ On-premise data warehouses
ELT:
- ✅ Cloud data warehouses (Snowflake, BigQuery, Redshift)
- ✅ Exploración de datos (no sabes qué transformaciones necesitas)
- ✅ Múltiples equipos con diferentes necesidades
- ✅ Cambios frecuentes en lógica de transformación
- ✅ Data lakes con datos semi-estructurados
Receta 5.2: Apache Airflow - Orquestación de Pipelines
¿Qué es Airflow? Plataforma para crear, programar y monitorear workflows de datos como código (DAGs).
Conceptos clave:
- DAG (Directed Acyclic Graph): Workflow completo
- Task: Unidad de trabajo (un nodo del DAG)
- Operator: Template para crear tasks (PythonOperator, BashOperator, etc.)
- Sensor: Task que espera por una condición
- XCom: Intercambio de datos entre tasks
DAG básico:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd
# Default arguments para todas las tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1)
}
# Definir DAG
dag = DAG(
'daily_sales_etl',
default_args=default_args,
description='Daily sales ETL pipeline',
schedule_interval='0 2 * * *', # Todos los días a las 2am
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'etl', 'daily']
)
# Task 1: Verificar que archivo existe
wait_for_file = FileSensor(
task_id='wait_for_sales_file',
filepath='/data/sales/{{ ds }}.csv', # {{ ds }} = execution date
poke_interval=60, # Check cada 60 segundos
timeout=3600, # Timeout después de 1 hora
mode='poke',
dag=dag
)
# Task 2: Extraer datos
def extract_sales(**context):
"""Extraer datos del archivo"""
execution_date = context['ds']
df = pd.read_csv(f'/data/sales/{execution_date}.csv')
# Guardar en XCom para siguiente task
context['task_instance'].xcom_push(
key='row_count',
value=len(df)
)
print(f"✅ Extracted {len(df)} rows")
return df.to_json()
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales,
provide_context=True,
dag=dag
)
# Task 3: Transformar datos
def transform_sales(**context):
"""Transformar datos"""
# Obtener datos de task anterior via XCom
raw_data = context['task_instance'].xcom_pull(
task_ids='extract_sales_data'
)
df = pd.read_json(raw_data)
# Transformaciones
df['sale_date'] = pd.to_datetime(df['sale_date'])
df['revenue'] = df['quantity'] * df['price']
df = df.dropna()
print(f"✅ Transformed {len(df)} rows")
return df.to_json()
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_sales,
provide_context=True,
dag=dag
)
# Task 4: Cargar a database
def load_to_database(**context):
"""Cargar datos a database"""
from sqlalchemy import create_engine
transformed_data = context['task_instance'].xcom_pull(
task_ids='transform_sales_data'
)
df = pd.read_json(transformed_data)
engine = create_engine('postgresql://user:pass@host:5432/db')
df.to_sql(
'sales_fact',
engine,
if_exists='append',
index=False
)
print(f"✅ Loaded {len(df)} rows to database")
load_task = PythonOperator(
task_id='load_to_database',
python_callable=load_to_database,
provide_context=True,
dag=dag
)
# Task 5: Generar reporte
generate_report = BashOperator(
task_id='generate_daily_report',
bash_command="""
python /scripts/generate_report.py \
--date {{ ds }} \
--output /reports/sales_{{ ds }}.pdf
""",
dag=dag
)
# Task 6: Cleanup
cleanup = BashOperator(
task_id='cleanup_temp_files',
bash_command='rm -f /tmp/sales_{{ ds }}*',
dag=dag
)
# Definir dependencias (orden de ejecución)
wait_for_file >> extract_task >> transform_task >> load_task >> generate_report >> cleanup
DAG con branching (conditional execution):
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
def check_data_quality(**context):
"""Decidir siguiente paso basado en calidad de datos"""
row_count = context['task_instance'].xcom_pull(
task_ids='extract_sales_data',
key='row_count'
)
if row_count > 1000:
return 'process_large_dataset'
elif row_count > 0:
return 'process_small_dataset'
else:
return 'skip_processing'
branch_task = BranchPythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
provide_context=True,
dag=dag
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print("Processing large dataset with Spark"),
dag=dag
)
process_small = PythonOperator(
task_id='process_small_dataset',
python_callable=lambda: print("Processing small dataset with Pandas"),
dag=dag
)
skip = DummyOperator(
task_id='skip_processing',
dag=dag
)
join = DummyOperator(
task_id='join',
trigger_rule='none_failed_min_one_success', # Continuar si al menos uno succeeded
dag=dag
)
# Dependencias
extract_task >> branch_task >> [process_large, process_small, skip] >> join
TaskGroups (organizar tasks relacionadas):
from airflow.utils.task_group import TaskGroup
with DAG('complex_pipeline', ...) as dag:
start = DummyOperator(task_id='start')
# TaskGroup para procesamiento de customers
with TaskGroup('process_customers', tooltip='Customer data processing') as customer_group:
extract_customers = PythonOperator(...)
transform_customers = PythonOperator(...)
load_customers = PythonOperator(...)
extract_customers >> transform_customers >> load_customers
# TaskGroup para procesamiento de orders
with TaskGroup('process_orders', tooltip='Order data processing') as order_group:
extract_orders = PythonOperator(...)
transform_orders = PythonOperator(...)
load_orders = PythonOperator(...)
extract_orders >> transform_orders >> load_orders
# TaskGroup para analytics
with TaskGroup('analytics', tooltip='Generate analytics') as analytics_group:
calculate_metrics = PythonOperator(...)
generate_dashboard = PythonOperator(...)
send_notifications = PythonOperator(...)
calculate_metrics >> [generate_dashboard, send_notifications]
end = DummyOperator(task_id='end')
# Dependencias entre grupos
start >> [customer_group, order_group] >> analytics_group >> end
Dynamic Task Generation:
from airflow.models import Variable
def create_processing_tasks():
"""Crear tasks dinámicamente basado en configuración"""
# Leer configuración (puede venir de Variable, database, etc.)
sources = Variable.get('data_sources', deserialize_json=True)
# sources = [
# {'name': 'salesforce', 'table': 'accounts'},
# {'name': 'hubspot', 'table': 'contacts'},
# {'name': 'stripe', 'table': 'payments'}
# ]
tasks = []
for source in sources:
task = PythonOperator(
task_id=f"process_{source['name']}_{source['table']}",
python_callable=process_source,
op_kwargs={
'source_name': source['name'],
'table_name': source['table']
},
dag=dag
)
tasks.append(task)
return tasks
def process_source(source_name, table_name):
"""Procesar un source específico"""
print(f"Processing {source_name}.{table_name}")
# Lógica de procesamiento
# Crear tasks dinámicamente
start = DummyOperator(task_id='start', dag=dag)
processing_tasks = create_processing_tasks()
end = DummyOperator(task_id='end', dag=dag)
# Dependencias
start >> processing_tasks >> end
Monitoring y Alerting:
from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email
def custom_failure_alert(context):
"""Callback personalizado cuando falla una task"""
task_instance = context['task_instance']
exception = context.get('exception')
subject = f"❌ Airflow Task Failed: {task_instance.task_id}"
html_content = f"""
<h3>Task Failed</h3>
<ul>
<li><b>DAG:</b> {task_instance.dag_id}</li>
<li><b>Task:</b> {task_instance.task_id}</li>
<li><b>Execution Date:</b> {context['execution_date']}</li>
<li><b>Error:</b> {exception}</li>
<li><b>Log URL:</b> {task_instance.log_url}</li>
</ul>
"""
send_email(
to=['data-team@example.com'],
subject=subject,
html_content=html_content
)
def data_quality_check(**context):
"""Verificar calidad de datos"""
# Verificaciones
checks = {
'null_check': check_nulls(),
'duplicate_check': check_duplicates(),
'schema_check': check_schema()
}
failed_checks = [k for k, v in checks.items() if not v]
if failed_checks:
raise ValueError(f"Data quality checks failed: {failed_checks}")
print("✅ All data quality checks passed")
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
on_failure_callback=custom_failure_alert,
dag=dag
)
Comandos Airflow útiles:
# Inicializar database
airflow db init
# Crear usuario admin
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# Listar DAGs
airflow dags list
# Test DAG (syntax check)
airflow dags test daily_sales_etl 2024-01-01
# Test task específica
airflow tasks test daily_sales_etl extract_sales_data 2024-01-01
# Trigger DAG manualmente
airflow dags trigger daily_sales_etl
# Ver estado de DAG runs
airflow dags list-runs -d daily_sales_etl
# Pausar/Despausar DAG
airflow dags pause daily_sales_etl
airflow dags unpause daily_sales_etl
# Ver logs de task
airflow tasks logs daily_sales_etl extract_sales_data 2024-01-01
# Limpiar DAG runs antiguos
airflow dags delete daily_sales_etl
Receta 5.3: Apache Spark - Procesamiento Distribuido
¿Qué es Spark? Motor de procesamiento distribuido para big data, mucho más rápido que MapReduce.
Conceptos clave:
- RDD (Resilient Distributed Dataset): Colección distribuida inmutable
- DataFrame: RDD con schema (como tabla SQL)
- Transformation: Operación lazy (map, filter, join)
- Action: Operación que ejecuta las transformations (collect, count, save)
- Partition: División de datos para procesamiento paralelo
Spark con PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
# ===== INICIALIZAR SPARK =====
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# ===== LEER DATOS =====
# CSV
df_sales = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://bucket/sales/*.csv")
# Parquet (formato columnar optimizado)
df_customers = spark.read.parquet("s3://bucket/customers/")
# JSON
df_events = spark.read.json("s3://bucket/events/*.json")
# Database
df_products = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "products") \
.option("user", "user") \
.option("password", "pass") \
.load()
# ===== TRANSFORMACIONES BÁSICAS =====
# Select columnas
df_selected = df_sales.select(
"order_id",
"customer_id",
"total_amount",
"order_date"
)
# Filter
df_filtered = df_sales.filter(
(col("total_amount") > 100) &
(col("order_date") >= "2024-01-01")
)
# Agregar columnas
df_enriched = df_sales.withColumn(
"order_year",
year(col("order_date"))
).withColumn(
"revenue_tier",
when(col("total_amount") < 100, "Low")
.when(col("total_amount") < 500, "Medium")
.otherwise("High")
)
# Renombrar columnas
df_renamed = df_sales.withColumnRenamed("total_amount", "revenue")
# Drop columnas
df_dropped = df_sales.drop("unnecessary_column")
# ===== AGREGACIONES =====
# Group by y aggregate
df_summary = df_sales.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value"),
max("order_date").alias("last_order_date")
)
# Multiple group by
df_monthly = df_sales.groupBy(
year("order_date").alias("year"),
month("order_date").alias("month")
).agg(
sum("total_amount").alias("monthly_revenue"),
count("order_id").alias("order_count")
).orderBy("year", "month")
# ===== JOINS =====
# Inner join
df_orders_with_customers = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"inner"
)
# Left join
df_all_orders = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"left"
)
# Broadcast join (para tablas pequeñas)
from pyspark.sql.functions import broadcast
df_with_products = df_sales.join(
broadcast(df_products), # df_products es pequeño
df_sales.product_id == df_products.id
)
# ===== WINDOW FUNCTIONS =====
# Ranking
window_spec = Window.partitionBy("customer_id").orderBy(desc("order_date"))
df_ranked = df_sales.withColumn(
"order_rank",
row_number().over(window_spec)
).withColumn(
"revenue_rank",
rank().over(Window.partitionBy("customer_id").orderBy(desc("total_amount")))
)
# Running total
window_running = Window.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_cumulative = df_sales.withColumn(
"cumulative_revenue",
sum("total_amount").over(window_running)
)
# Moving average
window_moving = Window.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(-6, 0) # Últimos 7 registros
df_moving_avg = df_sales.withColumn(
"moving_avg_7",
avg("total_amount").over(window_moving)
)
# ===== LIMPIEZA DE DATOS =====
# Remover nulls
df_clean = df_sales.dropna(subset=["customer_id", "order_date"])
# Rellenar nulls
df_filled = df_sales.fillna({
"discount": 0,
"notes": "No notes",
"category": "Uncategorized"
})
# Remover duplicados
df_deduped = df_sales.dropDuplicates(["order_id"])
# ===== UDFs (User Defined Functions) =====
from pyspark.sql.types import StringType
def categorize_customer(total_revenue):
"""Categorizar cliente por revenue"""
if total_revenue > 10000:
return "VIP"
elif total_revenue > 5000:
return "Premium"
elif total_revenue > 1000:
return "Standard"
else:
return "Basic"
# Registrar UDF
categorize_udf = udf(categorize_customer, StringType())
# Usar UDF
df_categorized = df_summary.withColumn(
"customer_category",
categorize_udf(col("total_revenue"))
)
# ===== OPTIMIZACIONES =====
# 1. Cache para reutilizar DataFrame
df_sales.cache()
df_sales.count() # Trigger cache
# 2. Repartition para balancear carga
df_repartitioned = df_sales.repartition(200, "customer_id")
# 3. Coalesce para reducir partitions (después de filter)
df_small = df_filtered.coalesce(10)
# 4. Broadcast hint para joins
df_optimized = df_sales.join(
broadcast(df_products),
"product_id"
)
# ===== ESCRIBIR RESULTADOS =====
# Parquet (recomendado para analytics)
df_summary.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("s3://bucket/output/summary/")
# CSV
df_summary.write \
.mode("overwrite") \
.option("header", "true") \
.csv("s3://bucket/output/summary.csv")
# Database
df_summary.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "customer_summary") \
.option("user", "user") \
.option("password", "pass") \
.mode("overwrite") \
.save()
# Delta Lake (versionado, ACID)
df_summary.write \
.format("delta") \
.mode("overwrite") \
.save("s3://bucket/delta/summary")
# ===== CASO DE USO COMPLETO =====
class SparkETLPipeline:
"""Pipeline ETL con Spark"""
def __init__(self, spark: SparkSession):
self.spark = spark
def extract_sales_data(self, date_from: str, date_to: str):
"""Extraer datos de ventas"""
df = self.spark.read.parquet("s3://bucket/sales/") \
.filter(
(col("order_date") >= date_from) &
(col("order_date") <= date_to)
)
print(f"📥 Extracted {df.count()} sales records")
return df
def enrich_with_customer_data(self, df_sales):
"""Enriquecer con datos de clientes"""
df_customers = self.spark.read.parquet("s3://bucket/customers/")
df_enriched = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"left"
).select(
df_sales["*"],
df_customers["customer_name"],
df_customers["customer_segment"],
df_customers["customer_lifetime_value"]
)
return df_enriched
def calculate_metrics(self, df):
"""Calcular métricas de negocio"""
# Customer-level metrics
df_customer_metrics = df.groupBy("customer_id", "customer_name") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value"),
max("order_date").alias("last_purchase_date"),
min("order_date").alias("first_purchase_date")
)
# Add derived metrics
df_customer_metrics = df_customer_metrics.withColumn(
"customer_tenure_days",
datediff(col("last_purchase_date"), col("first_purchase_date"))
).withColumn(
"avg_days_between_orders",
col("customer_tenure_days") / col("total_orders")
)
# Product-level metrics
df_product_metrics = df.groupBy("product_id", "product_name") \
.agg(
count("order_id").alias("times_sold"),
sum("quantity").alias("total_quantity"),
sum("total_amount").alias("total_revenue")
) \
.orderBy(desc("total_revenue"))
return df_customer_metrics, df_product_metrics
def run(self, date_from: str, date_to: str):
"""Ejecutar pipeline completo"""
print(f"\n{'='*60}")
print(f"🚀 Starting Spark ETL Pipeline")
print(f"{'='*60}\n")
# Extract
df_sales = self.extract_sales_data(date_from, date_to)
# Transform
df_enriched = self.enrich_with_customer_data(df_sales)
df_customer_metrics, df_product_metrics = self.calculate_metrics(df_enriched)
# Load
df_customer_metrics.write \
.mode("overwrite") \
.parquet("s3://bucket/metrics/customer_metrics/")
df_product_metrics.write \
.mode("overwrite") \
.parquet("s3://bucket/metrics/product_metrics/")
print(f"\n{'='*60}")
print(f"✅ Pipeline completed successfully")
print(f"{'='*60}\n")
# Uso
pipeline = SparkETLPipeline(spark)
pipeline.run("2024-01-01", "2024-12-31")
Spark Streaming (procesamiento en tiempo real):
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamProcessing") \
.getOrCreate()
# Leer stream desde Kafka
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales-events") \
.load()
# Parse JSON
schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType())
])
df_parsed = df_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Windowed aggregation (últimos 5 minutos)
df_windowed = df_parsed \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"customer_id"
) \
.agg(
count("order_id").alias("order_count"),
sum("amount").alias("total_amount")
)
# Escribir stream
query = df_windowed.writeStream \
.outputMode("update") \
.format("parquet") \
.option("path", "s3://bucket/streaming-output/") \
.option("checkpointLocation", "s3://bucket/checkpoints/") \
.start()
query.awaitTermination()
Receta 5.4: Data Warehousing vs Data Lakes
¿Qué son?
Data Warehouse:
- Almacén estructurado y optimizado para analytics
- Schema-on-write (defines schema antes de cargar)
- Datos limpios y modelados
- Ejemplos: Snowflake, BigQuery, Redshift
Data Lake:
- Almacén de datos crudos en cualquier formato
- Schema-on-read (defines schema al leer)
- Todos los datos (estructurados, semi-estructurados, no-estructurados)
- Ejemplos: S3 + Athena, Azure Data Lake, Google Cloud Storage
Comparación:
| Característica | Data Warehouse | Data Lake |
|---|---|---|
| Datos | Estructurados | Todos los tipos |
| Schema | Schema-on-write | Schema-on-read |
| Usuarios | Analistas de negocio | Data scientists, engineers |
| Procesamiento | SQL | SQL, Python, Spark |
| Costo | 💰💰💰 Alto | 💰 Bajo |
| Performance | 🟢 Rápido | 🟡 Depende |
| Flexibilidad | 🔴 Rígido | 🟢 Muy flexible |
| Casos de uso | BI, reporting | ML, exploración, archives |
Arquitectura Data Lakehouse (lo mejor de ambos):
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────────────────────────────────────────────────────┤
│ Databases │ APIs │ SaaS │ Logs │ IoT │ Files │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Raw Layer (Data Lake) │
├─────────────────────────────────────────────────────────────┤
│ S3/Azure Data Lake/GCS │
│ • Formato: JSON, CSV, Avro, Parquet │
│ • Sin transformación │
│ • Particionado por fecha │
└──────────────────────┬──────────────────────────────────────┘
│
▼ (ETL/ELT)
┌─────────────────────────────────────────────────────────────┐
│ Processed Layer (Bronze → Silver) │
├─────────────────────────────────────────────────────────────┤
│ Delta Lake / Apache Iceberg │
│ • Formato: Parquet con metadata │
│ • Limpiado y validado │
│ • ACID transactions │
└──────────────────────┬──────────────────────────────────────┘
│
▼ (Transformations)
┌─────────────────────────────────────────────────────────────┐
│ Curated Layer (Gold) │
├─────────────────────────────────────────────────────────────┤
│ • Dimensional modeling (star schema) │
│ • Aggregations y métricas │
│ • Optimizado para queries │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Consumption Layer │
├─────────────────────────────────────────────────────────────┤
│ BI Tools │ ML Models │ Apps │ APIs │
└─────────────────────────────────────────────────────────────┘
Medallion Architecture (Bronze → Silver → Gold):
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# ===== BRONZE LAYER: Datos crudos =====
def ingest_to_bronze(source_path: str, bronze_path: str):
"""Cargar datos crudos sin transformación"""
df_raw = spark.read.json(source_path)
# Agregar metadata de ingesta
df_bronze = df_raw \
.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", input_file_name())
# Escribir a Bronze (append only)
df_bronze.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingested_date") \
.save(bronze_path)
print(f"✅ Ingested {df_bronze.count()} records to Bronze")
# ===== SILVER LAYER: Datos limpios =====
def bronze_to_silver(bronze_path: str, silver_path: str):
"""Limpiar y validar datos"""
df_bronze = spark.read.format("delta").load(bronze_path)
# Transformaciones
df_silver = df_bronze \
.dropna(subset=["order_id", "customer_id"]) \
.dropDuplicates(["order_id"]) \
.withColumn("order_date", to_date("order_date")) \
.withColumn("total_amount", col("total_amount").cast("decimal(10,2)")) \
.filter(col("total_amount") > 0) \
.select(
"order_id",
"customer_id",
"order_date",
"total_amount",
"status",
"_ingested_at"
)
# Merge incremental (upsert)
if DeltaTable.isDeltaTable(spark, silver_path):
delta_silver = DeltaTable.forPath(spark, silver_path)
delta_silver.alias("target").merge(
df_silver.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_silver.write \
.format("delta") \
.mode("overwrite") \
.save(silver_path)
print(f"✅ Processed {df_silver.count()} records to Silver")
# ===== GOLD LAYER: Datos agregados para analytics =====
def silver_to_gold(silver_path: str, gold_path: str):
"""Crear tablas agregadas para BI"""
df_silver = spark.read.format("delta").load(silver_path)
# Métrica: Revenue por día
df_daily_revenue = df_silver \
.groupBy("order_date") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value")
) \
.orderBy("order_date")
df_daily_revenue.write \
.format("delta") \
.mode("overwrite") \
.save(f"{gold_path}/daily_revenue")
# Métrica: Customer lifetime value
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
df_customer_ltv = df_silver \
.withColumn("cumulative_revenue", sum("total_amount").over(window_spec)) \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
max("cumulative_revenue").alias("lifetime_value"),
max("order_date").alias("last_order_date")
)
df_customer_ltv.write \
.format("delta") \
.mode("overwrite") \
.save(f"{gold_path}/customer_ltv")
print("✅ Created Gold layer tables")
# Pipeline completo
def run_medallion_pipeline():
"""Ejecutar pipeline completo Bronze → Silver → Gold"""
# Paths
source_path = "s3://raw-data/orders/*.json"
bronze_path = "s3://datalake/bronze/orders"
silver_path = "s3://datalake/silver/orders"
gold_path = "s3://datalake/gold"
# Execute layers
ingest_to_bronze(source_path, bronze_path)
bronze_to_silver(bronze_path, silver_path)
silver_to_gold(silver_path, gold_path)
print("\n✅ Medallion pipeline completed")
run_medallion_pipeline()
Time Travel con Delta Lake:
# Leer versión específica
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path")
# Leer snapshot en timestamp
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("path")
# Ver historia de cambios
delta_table = DeltaTable.forPath(spark, "path")
delta_table.history().show()
# Rollback a versión anterior
delta_table.restoreToVersion(5)
Receta 5.5: Stream Processing con Kafka
¿Qué es Kafka? Plataforma distribuida de streaming para publicar, suscribir y procesar flujos de datos en tiempo real.
Conceptos clave:
- Topic: Canal de mensajes (como tabla de database)
- Producer: Publica mensajes al topic
- Consumer: Lee mensajes del topic
- Partition: División del topic para paralelismo
- Offset: Posición del mensaje en la partition
- Consumer Group: Grupo de consumers que se distribuyen partitions
Producer (Python):
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
from datetime import datetime
import time
class SalesEventProducer:
"""Producer de eventos de ventas"""
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Esperar confirmación de todos los replicas
retries=3,
max_in_flight_requests_per_connection=1 # Garantizar orden
)
self.topic = 'sales-events'
def send_event(self, event: dict):
"""Enviar evento de venta"""
# Key para partitioning (todos los eventos del mismo customer van a misma partition)
key = event.get('customer_id')
# Agregar timestamp
event['timestamp'] = datetime.now().isoformat()
# Send async
future = self.producer.send(
self.topic,
key=key,
value=event
)
# Callback para manejar success/error
try:
record_metadata = future.get(timeout=10)
print(f"✅ Sent to partition {record_metadata.partition}, offset {record_metadata.offset}")
except KafkaError as e:
print(f"❌ Failed to send: {e}")
def send_batch(self, events: list):
"""Enviar múltiples eventos"""
for event in events:
self.send_event(event)
# Flush para asegurar que todos se enviaron
self.producer.flush()
def close(self):
"""Cerrar producer"""
self.producer.close()
# Uso
producer = SalesEventProducer()
# Simular eventos
for i in range(100):
event = {
'order_id': f'order_{i}',
'customer_id': f'customer_{i % 10}', # 10 customers
'product_id': f'product_{i % 5}',
'quantity': 1,
'amount': 99.99
}
producer.send_event(event)
time.sleep(0.1)
producer.close()
Consumer (Python):
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
class SalesEventConsumer:
"""Consumer de eventos de ventas"""
def __init__(
self,
bootstrap_servers=['localhost:9092'],
group_id='sales-processor'
):
self.consumer = KafkaConsumer(
'sales-events',
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest', # 'earliest' o 'latest'
enable_auto_commit=False, # Manual commit para control
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_records=100 # Batch size
)
def process_events(self):
"""Procesar eventos continuamente"""
try:
for message in self.consumer:
# Obtener evento
event = message.value
print(f"\n📨 Received from partition {message.partition}, offset {message.offset}")
print(f"Event: {event}")
# Procesar evento
try:
self.process_single_event(event)
# Commit offset después de procesar exitosamente
self.consumer.commit()
except Exception as e:
print(f"❌ Error processing event: {e}")
# No hacer commit, re-procesar en siguiente poll
except KeyboardInterrupt:
print("\n🛑 Stopping consumer...")
finally:
self.consumer.close()
def process_single_event(self, event: dict):
"""Procesar un evento individual"""
# Validar evento
if not event.get('order_id'):
raise ValueError("Missing order_id")
# Business logic
print(f"Processing order {event['order_id']} for customer {event['customer_id']}")
# Guardar en database, enviar notificación, etc.
# save_to_database(event)
# send_notification(event)
def process_batch(self):
"""Procesar eventos en batch"""
batch = []
batch_size = 100
try:
for message in self.consumer:
batch.append(message.value)
if len(batch) >= batch_size:
# Procesar batch
self.process_batch_events(batch)
# Commit
self.consumer.commit()
batch = []
except KeyboardInterrupt:
# Procesar batch restante
if batch:
self.process_batch_events(batch)
self.consumer.commit()
finally:
self.consumer.close()
def process_batch_events(self, events: list):
"""Procesar batch de eventos"""
print(f"📦 Processing batch of {len(events)} events")
# Bulk insert a database, etc.
# bulk_insert_to_db(events)
# Uso
consumer = SalesEventConsumer()
consumer.process_events()
Stream Processing con Kafka Streams (conceptual en Python):
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta
class StreamProcessor:
"""Procesamiento de streams con agregaciones"""
def __init__(self):
self.consumer = KafkaConsumer(
'sales-events',
bootstrap_servers=['localhost:9092'],
group_id='stream-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# State store (en producción: usar RocksDB o similar)
self.state = defaultdict(lambda: {
'total_amount': 0,
'count': 0,
'last_update': None
})
def process_stream(self):
"""Procesar stream con agregaciones en ventana de tiempo"""
window_duration = timedelta(minutes=5)
for message in self.consumer:
event = message.value
customer_id = event['customer_id']
# Actualizar state
self.state[customer_id]['total_amount'] += event['amount']
self.state[customer_id]['count'] += 1
self.state[customer_id]['last_update'] = datetime.now()
# Verificar si ventana expiró
current_time = datetime.now()
if self.state[customer_id]['last_update'] < current_time - window_duration:
# Emitir resultado agregado
result = {
'customer_id': customer_id,
'window_start': (current_time - window_duration).isoformat(),
'window_end': current_time.isoformat(),
'total_amount': self.state[customer_id]['total_amount'],
'order_count': self.state[customer_id]['count'],
'avg_order_value': self.state[customer_id]['total_amount'] / self.state[customer_id]['count']
}
# Enviar a topic de resultados
self.producer.send('customer-metrics', value=result)
# Reset state
self.state[customer_id] = {
'total_amount': 0,
'count': 0,
'last_update': current_time
}
# Commit
self.consumer.commit()
# Uso
processor = StreamProcessor()
processor.process_stream()
Kafka con Apache Flink (procesamiento avanzado):
# Ejemplo conceptual (Flink usa Java/Scala principalmente)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
# Setup environment
env = StreamExecutionEnvironment.get_execution_environment()
# Kafka source
kafka_props = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-consumer'
}
kafka_source = FlinkKafkaConsumer(
topics='sales-events',
deserialization_schema=SimpleStringSchema(),
properties=kafka_props
)
# Stream pipeline
stream = env.add_source(kafka_source)
# Parse JSON y key by customer
parsed_stream = stream.map(lambda x: json.loads(x)) \
.key_by(lambda x: x['customer_id'])
# Windowed aggregation
result_stream = parsed_stream \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.aggregate(CustomerRevenueAggregator())
# Sink to output
result_stream.add_sink(kafka_sink)
# Execute
env.execute("Sales Stream Processing")