📊 Developer Cookbook - FASE 5: Ingeniería de Datos
Recetas prácticas para construir pipelines de datos robustos y escalables
📚 Tabla de Contenidos
Pipelines de Datos
Receta 5.1: ETL vs ELT - ¿Cuál usar?
¿Qué son?
- ETL (Extract, Transform, Load): Transformar datos ANTES de cargarlos al destino
- ELT (Extract, Load, Transform): Cargar datos crudos y transformar DESPUÉS en el destino
Comparación:
| Característica | ETL | ELT |
|---|---|---|
| Transformación | Antes de cargar | Después de cargar |
| Velocidad inicial | 🟡 Más lenta | 🟢 Más rápida |
| Flexibilidad | 🔴 Menos flexible | 🟢 Muy flexible |
| Costos compute | 🟢 Predecible | 🟡 Variable |
| Mejor para | Data warehouses tradicionales | Cloud data warehouses |
| Storage | 🟢 Menos storage | 🔴 Más storage |
| Casos de uso | Datos sensibles, compliance | Exploración, analytics |
Ejemplo ETL con Python:
import pandas as pd
from datetime import datetime
import psycopg2
from typing import Dict, List
class ETLPipeline:
"""Pipeline ETL tradicional"""
def __init__(self, source_config: Dict, target_config: Dict):
self.source_config = source_config
self.target_config = target_config
def extract(self, query: str) -> pd.DataFrame:
"""E - Extract: Extraer datos de la fuente"""
print("📥 Extracting data from source...")
# Conectar a base de datos source
conn = psycopg2.connect(**self.source_config)
# Extraer datos
df = pd.read_sql(query, conn)
conn.close()
print(f"✅ Extracted {len(df)} rows")
return df
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""T - Transform: Limpiar y transformar datos"""
print("⚙️ Transforming data...")
# 1. Limpiar valores nulos
df = df.dropna(subset=['customer_id', 'order_date'])
# 2. Normalizar columnas de texto
df['customer_name'] = df['customer_name'].str.strip().str.title()
df['email'] = df['email'].str.lower()
# 3. Convertir tipos de datos
df['order_date'] = pd.to_datetime(df['order_date'])
df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
# 4. Crear columnas derivadas
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['order_quarter'] = df['order_date'].dt.quarter
# 5. Categorizar clientes
df['customer_segment'] = pd.cut(
df['total_amount'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bronze', 'Silver', 'Gold', 'Platinum']
)
# 6. Remover duplicados
df = df.drop_duplicates(subset=['order_id'])
# 7. Agregar metadata
df['etl_processed_at'] = datetime.now()
df['etl_version'] = '1.0'
print(f"✅ Transformed to {len(df)} rows")
return df
def load(self, df: pd.DataFrame, table_name: str):
"""L - Load: Cargar datos al destino"""
print(f"📤 Loading data to {table_name}...")
# Conectar a base de datos target
conn = psycopg2.connect(**self.target_config)
cursor = conn.cursor()
# Truncar tabla si existe (full refresh)
cursor.execute(f"TRUNCATE TABLE {table_name}")
# Bulk insert
from io import StringIO
buffer = StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cursor.copy_from(
buffer,
table_name,
sep=',',
null='',
columns=df.columns.tolist()
)
conn.commit()
cursor.close()
conn.close()
print(f"✅ Loaded {len(df)} rows to {table_name}")
def run(self, extract_query: str, target_table: str):
"""Ejecutar pipeline completo"""
start_time = datetime.now()
print(f"\n{'='*60}")
print(f"🚀 Starting ETL Pipeline - {start_time}")
print(f"{'='*60}\n")
try:
# Extract
raw_data = self.extract(extract_query)
# Transform
clean_data = self.transform(raw_data)
# Load
self.load(clean_data, target_table)
# Log success
duration = (datetime.now() - start_time).total_seconds()
print(f"\n{'='*60}")
print(f"✅ ETL Pipeline completed successfully in {duration:.2f}s")
print(f"{'='*60}\n")
except Exception as e:
print(f"\n❌ ETL Pipeline failed: {str(e)}")
raise
# Uso
source_config = {
'host': 'source-db.example.com',
'database': 'production',
'user': 'etl_user',
'password': 'password'
}
target_config = {
'host': 'warehouse.example.com',
'database': 'analytics',
'user': 'warehouse_user',
'password': 'password'
}
pipeline = ETLPipeline(source_config, target_config)
extract_query = """
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.email,
o.order_date,
o.total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
"""
pipeline.run(extract_query, 'dim_orders')
Ejemplo ELT con dbt (Data Build Tool):
# models/staging/stg_orders.sql
-- Paso 1: Cargar datos crudos (ya están en el warehouse)
{{
config(
materialized='view'
)
}}
SELECT
order_id,
customer_id,
order_date,
total_amount,
_loaded_at
FROM {{ source('raw', 'orders') }}
# models/intermediate/int_orders_cleaned.sql
-- Paso 2: Limpiar datos
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT
order_id,
customer_id,
CAST(order_date AS DATE) as order_date,
ROUND(total_amount, 2) as total_amount,
EXTRACT(YEAR FROM order_date) as order_year,
EXTRACT(MONTH FROM order_date) as order_month,
EXTRACT(QUARTER FROM order_date) as order_quarter
FROM {{ ref('stg_orders') }}
WHERE order_date IS NOT NULL
AND total_amount > 0
{% if is_incremental() %}
-- Solo procesar nuevos registros
AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
# models/marts/fct_orders.sql
-- Paso 3: Crear tabla final para analytics
{{
config(
materialized='table'
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_segment,
o.order_date,
o.order_year,
o.order_month,
o.total_amount,
CASE
WHEN o.total_amount < 100 THEN 'Bronze'
WHEN o.total_amount < 500 THEN 'Silver'
WHEN o.total_amount < 1000 THEN 'Gold'
ELSE 'Platinum'
END as order_tier
FROM {{ ref('int_orders_cleaned') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
Cuándo usar cada uno:
ETL:
- ✅ Datos sensibles (PII, compliance)
- ✅ Transformaciones complejas requieren mucho CPU
- ✅ Target database con recursos limitados
- ✅ Necesitas validar antes de cargar
- ✅ On-premise data warehouses
ELT:
- ✅ Cloud data warehouses (Snowflake, BigQuery, Redshift)
- ✅ Exploración de datos (no sabes qué transformaciones necesitas)
- ✅ Múltiples equipos con diferentes necesidades
- ✅ Cambios frecuentes en lógica de transformación
- ✅ Data lakes con datos semi-estructurados
Receta 5.2: Apache Airflow - Orquestación de Pipelines
¿Qué es Airflow? Plataforma para crear, programar y monitorear workflows de datos como código (DAGs).
Conceptos clave:
- DAG (Directed Acyclic Graph): Workflow completo
- Task: Unidad de trabajo (un nodo del DAG)
- Operator: Template para crear tasks (PythonOperator, BashOperator, etc.)
- Sensor: Task que espera por una condición
- XCom: Intercambio de datos entre tasks
DAG básico:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
import pandas as pd
# Default arguments para todas las tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1)
}
# Definir DAG
dag = DAG(
'daily_sales_etl',
default_args=default_args,
description='Daily sales ETL pipeline',
schedule_interval='0 2 * * *', # Todos los días a las 2am
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'etl', 'daily']
)
# Task 1: Verificar que archivo existe
wait_for_file = FileSensor(
task_id='wait_for_sales_file',
filepath='/data/sales/{{ ds }}.csv', # {{ ds }} = execution date
poke_interval=60, # Check cada 60 segundos
timeout=3600, # Timeout después de 1 hora
mode='poke',
dag=dag
)
# Task 2: Extraer datos
def extract_sales(**context):
"""Extraer datos del archivo"""
execution_date = context['ds']
df = pd.read_csv(f'/data/sales/{execution_date}.csv')
# Guardar en XCom para siguiente task
context['task_instance'].xcom_push(
key='row_count',
value=len(df)
)
print(f"✅ Extracted {len(df)} rows")
return df.to_json()
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales,
provide_context=True,
dag=dag
)
# Task 3: Transformar datos
def transform_sales(**context):
"""Transformar datos"""
# Obtener datos de task anterior via XCom
raw_data = context['task_instance'].xcom_pull(
task_ids='extract_sales_data'
)
df = pd.read_json(raw_data)
# Transformaciones
df['sale_date'] = pd.to_datetime(df['sale_date'])
df['revenue'] = df['quantity'] * df['price']
df = df.dropna()
print(f"✅ Transformed {len(df)} rows")
return df.to_json()
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_sales,
provide_context=True,
dag=dag
)
# Task 4: Cargar a database
def load_to_database(**context):
"""Cargar datos a database"""
from sqlalchemy import create_engine
transformed_data = context['task_instance'].xcom_pull(
task_ids='transform_sales_data'
)
df = pd.read_json(transformed_data)
engine = create_engine('postgresql://user:pass@host:5432/db')
df.to_sql(
'sales_fact',
engine,
if_exists='append',
index=False
)
print(f"✅ Loaded {len(df)} rows to database")
load_task = PythonOperator(
task_id='load_to_database',
python_callable=load_to_database,
provide_context=True,
dag=dag
)
# Task 5: Generar reporte
generate_report = BashOperator(
task_id='generate_daily_report',
bash_command="""
python /scripts/generate_report.py \
--date {{ ds }} \
--output /reports/sales_{{ ds }}.pdf
""",
dag=dag
)
# Task 6: Cleanup
cleanup = BashOperator(
task_id='cleanup_temp_files',
bash_command='rm -f /tmp/sales_{{ ds }}*',
dag=dag
)
# Definir dependencias (orden de ejecución)
wait_for_file >> extract_task >> transform_task >> load_task >> generate_report >> cleanup
DAG con branching (conditional execution):
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
def check_data_quality(**context):
"""Decidir siguiente paso basado en calidad de datos"""
row_count = context['task_instance'].xcom_pull(
task_ids='extract_sales_data',
key='row_count'
)
if row_count > 1000:
return 'process_large_dataset'
elif row_count > 0:
return 'process_small_dataset'
else:
return 'skip_processing'
branch_task = BranchPythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
provide_context=True,
dag=dag
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print("Processing large dataset with Spark"),
dag=dag
)
process_small = PythonOperator(
task_id='process_small_dataset',
python_callable=lambda: print("Processing small dataset with Pandas"),
dag=dag
)
skip = DummyOperator(
task_id='skip_processing',
dag=dag
)
join = DummyOperator(
task_id='join',
trigger_rule='none_failed_min_one_success', # Continuar si al menos uno succeeded
dag=dag
)
# Dependencias
extract_task >> branch_task >> [process_large, process_small, skip] >> join
TaskGroups (organizar tasks relacionadas):
from airflow.utils.task_group import TaskGroup
with DAG('complex_pipeline', ...) as dag:
start = DummyOperator(task_id='start')
# TaskGroup para procesamiento de customers
with TaskGroup('process_customers', tooltip='Customer data processing') as customer_group:
extract_customers = PythonOperator(...)
transform_customers = PythonOperator(...)
load_customers = PythonOperator(...)
extract_customers >> transform_customers >> load_customers
# TaskGroup para procesamiento de orders
with TaskGroup('process_orders', tooltip='Order data processing') as order_group:
extract_orders = PythonOperator(...)
transform_orders = PythonOperator(...)
load_orders = PythonOperator(...)
extract_orders >> transform_orders >> load_orders
# TaskGroup para analytics
with TaskGroup('analytics', tooltip='Generate analytics') as analytics_group:
calculate_metrics = PythonOperator(...)
generate_dashboard = PythonOperator(...)
send_notifications = PythonOperator(...)
calculate_metrics >> [generate_dashboard, send_notifications]
end = DummyOperator(task_id='end')
# Dependencias entre grupos
start >> [customer_group, order_group] >> analytics_group >> end
Dynamic Task Generation:
from airflow.models import Variable
def create_processing_tasks():
"""Crear tasks dinámicamente basado en configuración"""
# Leer configuración (puede venir de Variable, database, etc.)
sources = Variable.get('data_sources', deserialize_json=True)
# sources = [
# {'name': 'salesforce', 'table': 'accounts'},
# {'name': 'hubspot', 'table': 'contacts'},
# {'name': 'stripe', 'table': 'payments'}
# ]
tasks = []
for source in sources:
task = PythonOperator(
task_id=f"process_{source['name']}_{source['table']}",
python_callable=process_source,
op_kwargs={
'source_name': source['name'],
'table_name': source['table']
},
dag=dag
)
tasks.append(task)
return tasks
def process_source(source_name, table_name):
"""Procesar un source específico"""
print(f"Processing {source_name}.{table_name}")
# Lógica de procesamiento
# Crear tasks dinámicamente
start = DummyOperator(task_id='start', dag=dag)
processing_tasks = create_processing_tasks()
end = DummyOperator(task_id='end', dag=dag)
# Dependencias
start >> processing_tasks >> end
Monitoring y Alerting:
from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email
def custom_failure_alert(context):
"""Callback personalizado cuando falla una task"""
task_instance = context['task_instance']
exception = context.get('exception')
subject = f"❌ Airflow Task Failed: {task_instance.task_id}"
html_content = f"""
<h3>Task Failed</h3>
<ul>
<li><b>DAG:</b> {task_instance.dag_id}</li>
<li><b>Task:</b> {task_instance.task_id}</li>
<li><b>Execution Date:</b> {context['execution_date']}</li>
<li><b>Error:</b> {exception}</li>
<li><b>Log URL:</b> {task_instance.log_url}</li>
</ul>
"""
send_email(
to=['data-team@example.com'],
subject=subject,
html_content=html_content
)
def data_quality_check(**context):
"""Verificar calidad de datos"""
# Verificaciones
checks = {
'null_check': check_nulls(),
'duplicate_check': check_duplicates(),
'schema_check': check_schema()
}
failed_checks = [k for k, v in checks.items() if not v]
if failed_checks:
raise ValueError(f"Data quality checks failed: {failed_checks}")
print("✅ All data quality checks passed")
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
on_failure_callback=custom_failure_alert,
dag=dag
)
Comandos Airflow útiles:
# Inicializar database
airflow db init
# Crear usuario admin
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# Listar DAGs
airflow dags list
# Test DAG (syntax check)
airflow dags test daily_sales_etl 2024-01-01
# Test task específica
airflow tasks test daily_sales_etl extract_sales_data 2024-01-01
# Trigger DAG manualmente
airflow dags trigger daily_sales_etl
# Ver estado de DAG runs
airflow dags list-runs -d daily_sales_etl
# Pausar/Despausar DAG
airflow dags pause daily_sales_etl
airflow dags unpause daily_sales_etl
# Ver logs de task
airflow tasks logs daily_sales_etl extract_sales_data 2024-01-01
# Limpiar DAG runs antiguos
airflow dags delete daily_sales_etl
Receta 5.3: Apache Spark - Procesamiento Distribuido
¿Qué es Spark? Motor de procesamiento distribuido para big data, mucho más rápido que MapReduce.
Conceptos clave:
- RDD (Resilient Distributed Dataset): Colección distribuida inmutable
- DataFrame: RDD con schema (como tabla SQL)
- Transformation: Operación lazy (map, filter, join)
- Action: Operación que ejecuta las transformations (collect, count, save)
- Partition: División de datos para procesamiento paralelo
Spark con PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
# ===== INICIALIZAR SPARK =====
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# ===== LEER DATOS =====
# CSV
df_sales = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://bucket/sales/*.csv")
# Parquet (formato columnar optimizado)
df_customers = spark.read.parquet("s3://bucket/customers/")
# JSON
df_events = spark.read.json("s3://bucket/events/*.json")
# Database
df_products = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "products") \
.option("user", "user") \
.option("password", "pass") \
.load()
# ===== TRANSFORMACIONES BÁSICAS =====
# Select columnas
df_selected = df_sales.select(
"order_id",
"customer_id",
"total_amount",
"order_date"
)
# Filter
df_filtered = df_sales.filter(
(col("total_amount") > 100) &
(col("order_date") >= "2024-01-01")
)
# Agregar columnas
df_enriched = df_sales.withColumn(
"order_year",
year(col("order_date"))
).withColumn(
"revenue_tier",
when(col("total_amount") < 100, "Low")
.when(col("total_amount") < 500, "Medium")
.otherwise("High")
)
# Renombrar columnas
df_renamed = df_sales.withColumnRenamed("total_amount", "revenue")
# Drop columnas
df_dropped = df_sales.drop("unnecessary_column")
# ===== AGREGACIONES =====
# Group by y aggregate
df_summary = df_sales.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value"),
max("order_date").alias("last_order_date")
)
# Multiple group by
df_monthly = df_sales.groupBy(
year("order_date").alias("year"),
month("order_date").alias("month")
).agg(
sum("total_amount").alias("monthly_revenue"),
count("order_id").alias("order_count")
).orderBy("year", "month")
# ===== JOINS =====
# Inner join
df_orders_with_customers = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"inner"
)
# Left join
df_all_orders = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"left"
)
# Broadcast join (para tablas pequeñas)
from pyspark.sql.functions import broadcast
df_with_products = df_sales.join(
broadcast(df_products), # df_products es pequeño
df_sales.product_id == df_products.id
)
# ===== WINDOW FUNCTIONS =====
# Ranking
window_spec = Window.partitionBy("customer_id").orderBy(desc("order_date"))
df_ranked = df_sales.withColumn(
"order_rank",
row_number().over(window_spec)
).withColumn(
"revenue_rank",
rank().over(Window.partitionBy("customer_id").orderBy(desc("total_amount")))
)
# Running total
window_running = Window.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_cumulative = df_sales.withColumn(
"cumulative_revenue",
sum("total_amount").over(window_running)
)
# Moving average
window_moving = Window.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(-6, 0) # Últimos 7 registros
df_moving_avg = df_sales.withColumn(
"moving_avg_7",
avg("total_amount").over(window_moving)
)
# ===== LIMPIEZA DE DATOS =====
# Remover nulls
df_clean = df_sales.dropna(subset=["customer_id", "order_date"])
# Rellenar nulls
df_filled = df_sales.fillna({
"discount": 0,
"notes": "No notes",
"category": "Uncategorized"
})
# Remover duplicados
df_deduped = df_sales.dropDuplicates(["order_id"])
# ===== UDFs (User Defined Functions) =====
from pyspark.sql.types import StringType
def categorize_customer(total_revenue):
"""Categorizar cliente por revenue"""
if total_revenue > 10000:
return "VIP"
elif total_revenue > 5000:
return "Premium"
elif total_revenue > 1000:
return "Standard"
else:
return "Basic"
# Registrar UDF
categorize_udf = udf(categorize_customer, StringType())
# Usar UDF
df_categorized = df_summary.withColumn(
"customer_category",
categorize_udf(col("total_revenue"))
)
# ===== OPTIMIZACIONES =====
# 1. Cache para reutilizar DataFrame
df_sales.cache()
df_sales.count() # Trigger cache
# 2. Repartition para balancear carga
df_repartitioned = df_sales.repartition(200, "customer_id")
# 3. Coalesce para reducir partitions (después de filter)
df_small = df_filtered.coalesce(10)
# 4. Broadcast hint para joins
df_optimized = df_sales.join(
broadcast(df_products),
"product_id"
)
# ===== ESCRIBIR RESULTADOS =====
# Parquet (recomendado para analytics)
df_summary.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("s3://bucket/output/summary/")
# CSV
df_summary.write \
.mode("overwrite") \
.option("header", "true") \
.csv("s3://bucket/output/summary.csv")
# Database
df_summary.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "customer_summary") \
.option("user", "user") \
.option("password", "pass") \
.mode("overwrite") \
.save()
# Delta Lake (versionado, ACID)
df_summary.write \
.format("delta") \
.mode("overwrite") \
.save("s3://bucket/delta/summary")
# ===== CASO DE USO COMPLETO =====
class SparkETLPipeline:
"""Pipeline ETL con Spark"""
def __init__(self, spark: SparkSession):
self.spark = spark
def extract_sales_data(self, date_from: str, date_to: str):
"""Extraer datos de ventas"""
df = self.spark.read.parquet("s3://bucket/sales/") \
.filter(
(col("order_date") >= date_from) &
(col("order_date") <= date_to)
)
print(f"📥 Extracted {df.count()} sales records")
return df
def enrich_with_customer_data(self, df_sales):
"""Enriquecer con datos de clientes"""
df_customers = self.spark.read.parquet("s3://bucket/customers/")
df_enriched = df_sales.join(
df_customers,
df_sales.customer_id == df_customers.id,
"left"
).select(
df_sales["*"],
df_customers["customer_name"],
df_customers["customer_segment"],
df_customers["customer_lifetime_value"]
)
return df_enriched
def calculate_metrics(self, df):
"""Calcular métricas de negocio"""
# Customer-level metrics
df_customer_metrics = df.groupBy("customer_id", "customer_name") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value"),
max("order_date").alias("last_purchase_date"),
min("order_date").alias("first_purchase_date")
)
# Add derived metrics
df_customer_metrics = df_customer_metrics.withColumn(
"customer_tenure_days",
datediff(col("last_purchase_date"), col("first_purchase_date"))
).withColumn(
"avg_days_between_orders",
col("customer_tenure_days") / col("total_orders")
)
# Product-level metrics
df_product_metrics = df.groupBy("product_id", "product_name") \
.agg(
count("order_id").alias("times_sold"),
sum("quantity").alias("total_quantity"),
sum("total_amount").alias("total_revenue")
) \
.orderBy(desc("total_revenue"))
return df_customer_metrics, df_product_metrics
def run(self, date_from: str, date_to: str):
"""Ejecutar pipeline completo"""
print(f"\n{'='*60}")
print(f"🚀 Starting Spark ETL Pipeline")
print(f"{'='*60}\n")
# Extract
df_sales = self.extract_sales_data(date_from, date_to)
# Transform
df_enriched = self.enrich_with_customer_data(df_sales)
df_customer_metrics, df_product_metrics = self.calculate_metrics(df_enriched)
# Load
df_customer_metrics.write \
.mode("overwrite") \
.parquet("s3://bucket/metrics/customer_metrics/")
df_product_metrics.write \
.mode("overwrite") \
.parquet("s3://bucket/metrics/product_metrics/")
print(f"\n{'='*60}")
print(f"✅ Pipeline completed successfully")
print(f"{'='*60}\n")
# Uso
pipeline = SparkETLPipeline(spark)
pipeline.run("2024-01-01", "2024-12-31")
Spark Streaming (procesamiento en tiempo real):
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("StreamProcessing") \
.getOrCreate()
# Leer stream desde Kafka
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales-events") \
.load()
# Parse JSON
schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType())
])
df_parsed = df_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Windowed aggregation (últimos 5 minutos)
df_windowed = df_parsed \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"customer_id"
) \
.agg(
count("order_id").alias("order_count"),
sum("amount").alias("total_amount")
)
# Escribir stream
query = df_windowed.writeStream \
.outputMode("update") \
.format("parquet") \
.option("path", "s3://bucket/streaming-output/") \
.option("checkpointLocation", "s3://bucket/checkpoints/") \
.start()
query.awaitTermination()
Receta 5.4: Data Warehousing vs Data Lakes
¿Qué son?
Data Warehouse:
- Almacén estructurado y optimizado para analytics
- Schema-on-write (defines schema antes de cargar)
- Datos limpios y modelados
- Ejemplos: Snowflake, BigQuery, Redshift
Data Lake:
- Almacén de datos crudos en cualquier formato
- Schema-on-read (defines schema al leer)
- Todos los datos (estructurados, semi-estructurados, no-estructurados)
- Ejemplos: S3 + Athena, Azure Data Lake, Google Cloud Storage
Comparación:
| Característica | Data Warehouse | Data Lake |
|---|---|---|
| Datos | Estructurados | Todos los tipos |
| Schema | Schema-on-write | Schema-on-read |
| Usuarios | Analistas de negocio | Data scientists, engineers |
| Procesamiento | SQL | SQL, Python, Spark |
| Costo | 💰💰💰 Alto | 💰 Bajo |
| Performance | 🟢 Rápido | 🟡 Depende |
| Flexibilidad | 🔴 Rígido | 🟢 Muy flexible |
| Casos de uso | BI, reporting | ML, exploración, archives |
Arquitectura Data Lakehouse (lo mejor de ambos):
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────────────────────────────────────────────────────┤
│ Databases │ APIs │ SaaS │ Logs │ IoT │ Files │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Raw Layer (Data Lake) │
├─────────────────────────────────────────────────────────────┤
│ S3/Azure Data Lake/GCS │
│ • Formato: JSON, CSV, Avro, Parquet │
│ • Sin transformación │
│ • Particionado por fecha │
└──────────────────────┬──────────────────────────────────────┘
│
▼ (ETL/ELT)
┌─────────────────────────────────────────────────────────────┐
│ Processed Layer (Bronze → Silver) │
├─────────────────────────────────────────────────────────────┤
│ Delta Lake / Apache Iceberg │
│ • Formato: Parquet con metadata │
│ • Limpiado y validado │
│ • ACID transactions │
└──────────────────────┬──────────────────────────────────────┘
│
▼ (Transformations)
┌─────────────────────────────────────────────────────────────┐
│ Curated Layer (Gold) │
├─────────────────────────────────────────────────────────────┤
│ • Dimensional modeling (star schema) │
│ • Aggregations y métricas │
│ • Optimizado para queries │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Consumption Layer │
├─────────────────────────────────────────────────────────────┤
│ BI Tools │ ML Models │ Apps │ APIs │
└─────────────────────────────────────────────────────────────┘
Medallion Architecture (Bronze → Silver → Gold):
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# ===== BRONZE LAYER: Datos crudos =====
def ingest_to_bronze(source_path: str, bronze_path: str):
"""Cargar datos crudos sin transformación"""
df_raw = spark.read.json(source_path)
# Agregar metadata de ingesta
df_bronze = df_raw \
.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", input_file_name())
# Escribir a Bronze (append only)
df_bronze.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingested_date") \
.save(bronze_path)
print(f"✅ Ingested {df_bronze.count()} records to Bronze")
# ===== SILVER LAYER: Datos limpios =====
def bronze_to_silver(bronze_path: str, silver_path: str):
"""Limpiar y validar datos"""
df_bronze = spark.read.format("delta").load(bronze_path)
# Transformaciones
df_silver = df_bronze \
.dropna(subset=["order_id", "customer_id"]) \
.dropDuplicates(["order_id"]) \
.withColumn("order_date", to_date("order_date")) \
.withColumn("total_amount", col("total_amount").cast("decimal(10,2)")) \
.filter(col("total_amount") > 0) \
.select(
"order_id",
"customer_id",
"order_date",
"total_amount",
"status",
"_ingested_at"
)
# Merge incremental (upsert)
if DeltaTable.isDeltaTable(spark, silver_path):
delta_silver = DeltaTable.forPath(spark, silver_path)
delta_silver.alias("target").merge(
df_silver.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_silver.write \
.format("delta") \
.mode("overwrite") \
.save(silver_path)
print(f"✅ Processed {df_silver.count()} records to Silver")
# ===== GOLD LAYER: Datos agregados para analytics =====
def silver_to_gold(silver_path: str, gold_path: str):
"""Crear tablas agregadas para BI"""
df_silver = spark.read.format("delta").load(silver_path)
# Métrica: Revenue por día
df_daily_revenue = df_silver \
.groupBy("order_date") \
.agg(
count("order_id").alias("total_orders"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_order_value")
) \
.orderBy("order_date")
df_daily_revenue.write \
.format("delta") \
.mode("overwrite") \
.save(f"{gold_path}/daily_revenue")
# Métrica: Customer lifetime value
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
df_customer_ltv = df_silver \
.withColumn("cumulative_revenue", sum("total_amount").over(window_spec)) \
.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
max("cumulative_revenue").alias("lifetime_value"),
max("order_date").alias("last_order_date")
)
df_customer_ltv.write \
.format("delta") \
.mode("overwrite") \
.save(f"{gold_path}/customer_ltv")
print("✅ Created Gold layer tables")
# Pipeline completo
def run_medallion_pipeline():
"""Ejecutar pipeline completo Bronze → Silver → Gold"""
# Paths
source_path = "s3://raw-data/orders/*.json"
bronze_path = "s3://datalake/bronze/orders"
silver_path = "s3://datalake/silver/orders"
gold_path = "s3://datalake/gold"
# Execute layers
ingest_to_bronze(source_path, bronze_path)
bronze_to_silver(bronze_path, silver_path)
silver_to_gold(silver_path, gold_path)
print("\n✅ Medallion pipeline completed")
run_medallion_pipeline()
Time Travel con Delta Lake:
# Leer versión específica
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("path")
# Leer snapshot en timestamp
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("path")
# Ver historia de cambios
delta_table = DeltaTable.forPath(spark, "path")
delta_table.history().show()
# Rollback a versión anterior
delta_table.restoreToVersion(5)
Receta 5.5: Stream Processing con Kafka
¿Qué es Kafka? Plataforma distribuida de streaming para publicar, suscribir y procesar flujos de datos en tiempo real.
Conceptos clave:
- Topic: Canal de mensajes (como tabla de database)
- Producer: Publica mensajes al topic
- Consumer: Lee mensajes del topic
- Partition: División del topic para paralelismo
- Offset: Posición del mensaje en la partition
- Consumer Group: Grupo de consumers que se distribuyen partitions
Producer (Python):
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
from datetime import datetime
import time
class SalesEventProducer:
"""Producer de eventos de ventas"""
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Esperar confirmación de todos los replicas
retries=3,
max_in_flight_requests_per_connection=1 # Garantizar orden
)
self.topic = 'sales-events'
def send_event(self, event: dict):
"""Enviar evento de venta"""
# Key para partitioning (todos los eventos del mismo customer van a misma partition)
key = event.get('customer_id')
# Agregar timestamp
event['timestamp'] = datetime.now().isoformat()
# Send async
future = self.producer.send(
self.topic,
key=key,
value=event
)
# Callback para manejar success/error
try:
record_metadata = future.get(timeout=10)
print(f"✅ Sent to partition {record_metadata.partition}, offset {record_metadata.offset}")
except KafkaError as e:
print(f"❌ Failed to send: {e}")
def send_batch(self, events: list):
"""Enviar múltiples eventos"""
for event in events:
self.send_event(event)
# Flush para asegurar que todos se enviaron
self.producer.flush()
def close(self):
"""Cerrar producer"""
self.producer.close()
# Uso
producer = SalesEventProducer()
# Simular eventos
for i in range(100):
event = {
'order_id': f'order_{i}',
'customer_id': f'customer_{i % 10}', # 10 customers
'product_id': f'product_{i % 5}',
'quantity': 1,
'amount': 99.99
}
producer.send_event(event)
time.sleep(0.1)
producer.close()
Consumer (Python):
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
class SalesEventConsumer:
"""Consumer de eventos de ventas"""
def __init__(
self,
bootstrap_servers=['localhost:9092'],
group_id='sales-processor'
):
self.consumer = KafkaConsumer(
'sales-events',
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest', # 'earliest' o 'latest'
enable_auto_commit=False, # Manual commit para control
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_records=100 # Batch size
)
def process_events(self):
"""Procesar eventos continuamente"""
try:
for message in self.consumer:
# Obtener evento
event = message.value
print(f"\n📨 Received from partition {message.partition}, offset {message.offset}")
print(f"Event: {event}")
# Procesar evento
try:
self.process_single_event(event)
# Commit offset después de procesar exitosamente
self.consumer.commit()
except Exception as e:
print(f"❌ Error processing event: {e}")
# No hacer commit, re-procesar en siguiente poll
except KeyboardInterrupt:
print("\n🛑 Stopping consumer...")
finally:
self.consumer.close()
def process_single_event(self, event: dict):
"""Procesar un evento individual"""
# Validar evento
if not event.get('order_id'):
raise ValueError("Missing order_id")
# Business logic
print(f"Processing order {event['order_id']} for customer {event['customer_id']}")
# Guardar en database, enviar notificación, etc.
# save_to_database(event)
# send_notification(event)
def process_batch(self):
"""Procesar eventos en batch"""
batch = []
batch_size = 100
try:
for message in self.consumer:
batch.append(message.value)
if len(batch) >= batch_size:
# Procesar batch
self.process_batch_events(batch)
# Commit
self.consumer.commit()
batch = []
except KeyboardInterrupt:
# Procesar batch restante
if batch:
self.process_batch_events(batch)
self.consumer.commit()
finally:
self.consumer.close()
def process_batch_events(self, events: list):
"""Procesar batch de eventos"""
print(f"📦 Processing batch of {len(events)} events")
# Bulk insert a database, etc.
# bulk_insert_to_db(events)
# Uso
consumer = SalesEventConsumer()
consumer.process_events()
Stream Processing con Kafka Streams (conceptual en Python):
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta
class StreamProcessor:
"""Procesamiento de streams con agregaciones"""
def __init__(self):
self.consumer = KafkaConsumer(
'sales-events',
bootstrap_servers=['localhost:9092'],
group_id='stream-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# State store (en producción: usar RocksDB o similar)
self.state = defaultdict(lambda: {
'total_amount': 0,
'count': 0,
'last_update': None
})
def process_stream(self):
"""Procesar stream con agregaciones en ventana de tiempo"""
window_duration = timedelta(minutes=5)
for message in self.consumer:
event = message.value
customer_id = event['customer_id']
# Actualizar state
self.state[customer_id]['total_amount'] += event['amount']
self.state[customer_id]['count'] += 1
self.state[customer_id]['last_update'] = datetime.now()
# Verificar si ventana expiró
current_time = datetime.now()
if self.state[customer_id]['last_update'] < current_time - window_duration:
# Emitir resultado agregado
result = {
'customer_id': customer_id,
'window_start': (current_time - window_duration).isoformat(),
'window_end': current_time.isoformat(),
'total_amount': self.state[customer_id]['total_amount'],
'order_count': self.state[customer_id]['count'],
'avg_order_value': self.state[customer_id]['total_amount'] / self.state[customer_id]['count']
}
# Enviar a topic de resultados
self.producer.send('customer-metrics', value=result)
# Reset state
self.state[customer_id] = {
'total_amount': 0,
'count': 0,
'last_update': current_time
}
# Commit
self.consumer.commit()
# Uso
processor = StreamProcessor()
processor.process_stream()
Kafka con Apache Flink (procesamiento avanzado):
# Ejemplo conceptual (Flink usa Java/Scala principalmente)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
# Setup environment
env = StreamExecutionEnvironment.get_execution_environment()
# Kafka source
kafka_props = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-consumer'
}
kafka_source = FlinkKafkaConsumer(
topics='sales-events',
deserialization_schema=SimpleStringSchema(),
properties=kafka_props
)
# Stream pipeline
stream = env.add_source(kafka_source)
# Parse JSON y key by customer
parsed_stream = stream.map(lambda x: json.loads(x)) \
.key_by(lambda x: x['customer_id'])
# Windowed aggregation
result_stream = parsed_stream \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.aggregate(CustomerRevenueAggregator())
# Sink to output
result_stream.add_sink(kafka_sink)
# Execute
env.execute("Sales Stream Processing")
SQL y Python para Data Engineering
Receta 5.6: SQL Avanzado - Window Functions y CTEs
Window Functions para Analytics:
-- ===== RANKING =====
-- ROW_NUMBER: Numeración única sin empates
SELECT
customer_id,
order_date,
total_amount,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date DESC
) as order_number
FROM orders;
-- RANK: Con empates (gaps en numeración)
SELECT
product_name,
category,
revenue,
RANK() OVER (
PARTITION BY category
ORDER BY revenue DESC
) as revenue_rank
FROM products;
-- DENSE_RANK: Con empates (sin gaps)
SELECT
salesperson,
sales_amount,
DENSE_RANK() OVER (
ORDER BY sales_amount DESC
) as rank
FROM sales;
-- ===== DISTRIBUCIÓN =====
-- NTILE: Dividir en N grupos
SELECT
customer_id,
total_revenue,
NTILE(4) OVER (
ORDER BY total_revenue DESC
) as quartile
FROM customer_summary;
-- PERCENT_RANK: Percentil relativo
SELECT
product_id,
price,
PERCENT_RANK() OVER (
ORDER BY price
) as price_percentile
FROM products;
-- ===== AGREGACIONES CON VENTANAS =====
-- Running total
SELECT
order_date,
daily_revenue,
SUM(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_revenue
FROM daily_sales;
-- Moving average (últimos 7 días)
SELECT
sale_date,
revenue,
AVG(revenue) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7day
FROM daily_revenue;
-- Year-over-Year comparison
SELECT
sale_date,
revenue,
LAG(revenue, 365) OVER (ORDER BY sale_date) as revenue_last_year,
revenue - LAG(revenue, 365) OVER (ORDER BY sale_date) as yoy_change,
ROUND(
(revenue - LAG(revenue, 365) OVER (ORDER BY sale_date)) /
LAG(revenue, 365) OVER (ORDER BY sale_date) * 100,
2
) as yoy_change_pct
FROM daily_revenue;
-- ===== LEAD y LAG =====
-- Siguiente y anterior valor
SELECT
customer_id,
order_date,
total_amount,
LAG(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as previous_order_date,
LEAD(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as next_order_date,
order_date - LAG(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as days_since_last_order
FROM orders;
-- ===== FIRST_VALUE y LAST_VALUE =====
-- Comparar con primer y último valor de la ventana
SELECT
product_id,
sale_date,
price,
FIRST_VALUE(price) OVER (
PARTITION BY product_id
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as initial_price,
LAST_VALUE(price) OVER (
PARTITION BY product_id
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_price
FROM product_prices;
CTEs (Common Table Expressions):
-- ===== CTE SIMPLE =====
-- Calcular métricas intermedias
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MAX(order_date) as last_order_date
FROM orders
GROUP BY customer_id
)
SELECT
c.customer_name,
c.email,
cm.total_orders,
cm.total_revenue,
cm.avg_order_value,
CURRENT_DATE - cm.last_order_date as days_since_last_order
FROM customers c
JOIN customer_metrics cm ON c.id = cm.customer_id
WHERE cm.total_revenue > 1000
ORDER BY cm.total_revenue DESC;
-- ===== MÚLTIPLES CTEs =====
WITH
-- CTE 1: Ventas mensuales
monthly_sales AS (
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as revenue,
COUNT(*) as order_count
FROM orders
GROUP BY DATE_TRUNC('month', order_date)
),
-- CTE 2: Agregar métricas derivadas
sales_with_metrics AS (
SELECT
month,
revenue,
order_count,
revenue / order_count as avg_order_value,
LAG(revenue) OVER (ORDER BY month) as prev_month_revenue
FROM monthly_sales
),
-- CTE 3: Calcular growth
sales_with_growth AS (
SELECT
month,
revenue,
prev_month_revenue,
CASE
WHEN prev_month_revenue IS NOT NULL THEN
ROUND((revenue - prev_month_revenue) / prev_month_revenue * 100, 2)
ELSE NULL
END as mom_growth_pct
FROM sales_with_metrics
)
SELECT *
FROM sales_with_growth
WHERE mom_growth_pct IS NOT NULL
ORDER BY month DESC;
-- ===== CTE RECURSIVO: Jerarquías =====
-- Organigrama de empleados
WITH RECURSIVE employee_hierarchy AS (
-- Caso base: CEO (sin manager)
SELECT
id,
name,
manager_id,
1 as level,
name::TEXT as path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Caso recursivo: empleados con managers
SELECT
e.id,
e.name,
e.manager_id,
eh.level + 1,
eh.path || ' > ' || e.name
FROM employees e
INNER JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
id,
name,
level,
path
FROM employee_hierarchy
ORDER BY level, name;
-- Series de fechas
WITH RECURSIVE date_series AS (
SELECT DATE '2024-01-01' as date
UNION ALL
SELECT date + INTERVAL '1 day'
FROM date_series
WHERE date < DATE '2024-12-31'
)
SELECT
ds.date,
COALESCE(SUM(o.total_amount), 0) as revenue
FROM date_series ds
LEFT JOIN orders o ON DATE(o.order_date) = ds.date
GROUP BY ds.date
ORDER BY ds.date;
Queries complejos para Data Engineering:
-- ===== COHORT ANALYSIS =====
-- Análisis de retención por cohorte
WITH first_purchase AS (
SELECT
customer_id,
MIN(DATE_TRUNC('month', order_date)) as cohort_month
FROM orders
GROUP BY customer_id
),
customer_orders AS (
SELECT
o.customer_id,
fp.cohort_month,
DATE_TRUNC('month', o.order_date) as order_month,
EXTRACT(YEAR FROM AGE(o.order_date, fp.cohort_month)) * 12 +
EXTRACT(MONTH FROM AGE(o.order_date, fp.cohort_month)) as months_since_first
FROM orders o
JOIN first_purchase fp ON o.customer_id = fp.customer_id
)
SELECT
cohort_month,
months_since_first,
COUNT(DISTINCT customer_id) as customers,
COUNT(*) as orders
FROM customer_orders
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;
-- ===== FUNNEL ANALYSIS =====
-- Embudo de conversión
WITH funnel_steps AS (
SELECT
user_id,
MAX(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as viewed,
MAX(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart,
MAX(CASE WHEN event_type = 'checkout' THEN 1 ELSE 0 END) as checked_out,
MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchased
FROM events
WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY user_id
)
SELECT
COUNT(*) as total_users,
SUM(viewed) as step1_viewed,
SUM(added_to_cart) as step2_add_to_cart,
SUM(checked_out) as step3_checkout,
SUM(purchased) as step4_purchase,
ROUND(SUM(added_to_cart)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as conversion_to_cart,
ROUND(SUM(purchased)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as overall_conversion
FROM funnel_steps;
-- ===== RFM ANALYSIS (Recency, Frequency, Monetary) =====
WITH customer_rfm AS (
SELECT
customer_id,
-- Recency: días desde última compra
CURRENT_DATE - MAX(order_date) as recency,
-- Frequency: número de órdenes
COUNT(*) as frequency,
-- Monetary: revenue total
SUM(total_amount) as monetary
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '1 year'
GROUP BY customer_id
),
rfm_scores AS (
SELECT
customer_id,
recency,
frequency,
monetary,
-- Scores de 1-5 basados en quintiles
NTILE(5) OVER (ORDER BY recency DESC) as r_score, -- Menor recency = mejor
NTILE(5) OVER (ORDER BY frequency ASC) as f_score,
NTILE(5) OVER (ORDER BY monetary ASC) as m_score
FROM customer_rfm
)
SELECT
customer_id,
CASE
WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champions'
WHEN r_score >= 3 AND f_score >= 3 THEN 'Loyal Customers'
WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customers'
WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
ELSE 'Others'
END as segment,
r_score,
f_score,
m_score
FROM rfm_scores;
Receta 5.7: Python para Data Engineering
Pandas - Manipulación de Datos:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ===== LEER DATOS =====
# CSV
df = pd.read_csv('sales.csv', parse_dates=['order_date'])
# Excel
df = pd.read_excel('data.xlsx', sheet_name='Sales')
# JSON
df = pd.read_json('data.json')
# SQL
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@host:5432/db')
df = pd.read_sql("SELECT * FROM orders", engine)
# Parquet
df = pd.read_parquet('data.parquet')
# ===== EXPLORACIÓN =====
# Info básica
print(df.head())
print(df.info())
print(df.describe())
# Valores únicos
print(df['category'].unique())
print(df['category'].value_counts())
# Valores nulos
print(df.isnull().sum())
print(df.isnull().sum() / len(df) * 100) # Porcentaje
# ===== LIMPIEZA =====
# Remover duplicados
df_clean = df.drop_duplicates(subset=['order_id'])
# Remover nulls
df_clean = df.dropna(subset=['customer_id', 'total_amount'])
# Rellenar nulls
df_filled = df.fillna({
'discount': 0,
'notes': 'No notes',
'category': 'Uncategorized'
})
# Forward fill (usar valor anterior)
df['price'] = df['price'].fillna(method='ffill')
# Backward fill
df['price'] = df['price'].fillna(method='bfill')
# Interpolar valores numéricos
df['sales'] = df['sales'].interpolate(method='linear')
# ===== TRANSFORMACIONES =====
# Crear nuevas columnas
df['revenue'] = df['quantity'] * df['price']
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['day_of_week'] = df['order_date'].dt.day_name()
# Apply function
def categorize_amount(amount):
if amount < 100:
return 'Low'
elif amount < 500:
return 'Medium'
else:
return 'High'
df['amount_category'] = df['total_amount'].apply(categorize_amount)
# Apply con lambda
df['customer_name_upper'] = df['customer_name'].apply(lambda x: x.upper())
# Map con diccionario
category_map = {'A': 'Premium', 'B': 'Standard', 'C': 'Basic'}
df['tier'] = df['category'].map(category_map)
# String operations
df['email'] = df['email'].str.lower().str.strip()
df['product_name'] = df['product_name'].str.title()
# ===== AGREGACIONES =====
# Group by simple
summary = df.groupby('customer_id').agg({
'order_id': 'count',
'total_amount': ['sum', 'mean', 'max'],
'order_date': 'max'
}).reset_index()
# Renombrar columnas después de agg
summary.columns = [
'customer_id',
'total_orders',
'total_revenue',
'avg_order_value',
'max_order_value',
'last_order_date'
]
# Group by múltiples columnas
monthly_summary = df.groupby([
df['order_date'].dt.to_period('M'),
'product_category'
]).agg({
'total_amount': 'sum',
'order_id': 'count'
}).reset_index()
# Custom aggregation function
def revenue_per_customer(group):
return group['total_amount'].sum() / group['customer_id'].nunique()
df.groupby('product_category').apply(revenue_per_customer)
# ===== PIVOT TABLES =====
# Pivot
pivot = df.pivot_table(
values='total_amount',
index='customer_id',
columns='product_category',
aggfunc='sum',
fill_value=0
)
# Multiple aggregations
pivot_multi = df.pivot_table(
values='total_amount',
index='order_date',
columns='product_category',
aggfunc=['sum', 'count', 'mean']
)
# ===== MERGE Y JOIN =====
# Inner join
df_merged = pd.merge(
df_orders,
df_customers,
left_on='customer_id',
right_on='id',
how='inner'
)
# Left join
df_all_orders = pd.merge(
df_orders,
df_customers,
left_on='customer_id',
right_on='id',
how='left'
)
# Multiple joins
df_complete = df_orders \
.merge(df_customers, left_on='customer_id', right_on='id') \
.merge(df_products, left_on='product_id', right_on='id')
# ===== WINDOW FUNCTIONS =====
# Ranking
df['revenue_rank'] = df.groupby('customer_id')['total_amount'].rank(
ascending=False,
method='dense'
)
# Cumulative sum
df['cumulative_revenue'] = df.groupby('customer_id')['total_amount'].cumsum()
# Moving average
df['moving_avg_3'] = df.groupby('customer_id')['total_amount'] \
.transform(lambda x: x.rolling(window=3, min_periods=1).mean())
# Shift (lag/lead)
df['prev_order_amount'] = df.groupby('customer_id')['total_amount'].shift(1)
df['next_order_amount'] = df.groupby('customer_id')['total_amount'].shift(-1)
# ===== TIME SERIES =====
# Resample (agregación temporal)
df.set_index('order_date', inplace=True)
# Daily to monthly
monthly = df.resample('M').agg({
'total_amount': 'sum',
'order_id': 'count'
})
# Rolling window
df['7day_avg'] = df['total_amount'].rolling(window=7).mean()
# ===== CASO DE USO COMPLETO =====
class DataPipeline:
"""Pipeline completo de procesamiento con Pandas"""
def __init__(self, input_path: str):
self.input_path = input_path
self.df = None
def extract(self):
"""Extraer datos"""
print("📥 Extracting data...")
self.df = pd.read_csv(self.input_path, parse_dates=['order_date'])
print(f"✅ Loaded {len(self.df)} rows")
return self
def clean(self):
"""Limpiar datos"""
print("🧹 Cleaning data...")
# Remover duplicados
before = len(self.df)
self.df = self.df.drop_duplicates(subset=['order_id'])
print(f" Removed {before - len(self.df)} duplicates")
# Remover nulls en columnas críticas
before = len(self.df)
self.df = self.df.dropna(subset=['customer_id', 'total_amount'])
print(f" Removed {before - len(self.df)} rows with nulls")
# Rellenar otros nulls
self.df['discount'] = self.df['discount'].fillna(0)
# Normalizar strings
self.df['customer_email'] = self.df['customer_email'].str.lower().str.strip()
# Filtrar valores inválidos
before = len(self.df)
self.df = self.df[self.df['total_amount'] > 0]
print(f" Removed {before - len(self.df)} invalid amounts")
print(f"✅ Clean dataset: {len(self.df)} rows")
return self
def transform(self):
"""Transformar y enriquecer datos"""
print("⚙️ Transforming data...")
# Columnas derivadas
self.df['revenue'] = self.df['quantity'] * self.df['price']
self.df['order_year'] = self.df['order_date'].dt.year
self.df['order_month'] = self.df['order_date'].dt.month
# Categorización
self.df['revenue_tier'] = pd.cut(
self.df['total_amount'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bronze', 'Silver', 'Gold', 'Platinum']
)
# Agregaciones por cliente
customer_metrics = self.df.groupby('customer_id').agg({
'order_id': 'count',
'total_amount': 'sum'
}).rename(columns={
'order_id': 'total_orders',
'total_amount': 'customer_ltv'
})
self.df = self.df.merge(customer_metrics, on='customer_id', how='left')
print("✅ Transformation complete")
return self
def validate(self):
"""Validar calidad de datos"""
print("✓ Validating data quality...")
# Check 1: No nulls en columnas críticas
critical_cols = ['order_id', 'customer_id', 'total_amount']
nulls = self.df[critical_cols].isnull().sum()
assert nulls.sum() == 0, f"Found nulls in critical columns: {nulls}"
# Check 2: No duplicados
duplicates = self.df.duplicated(subset=['order_id']).sum()
assert duplicates == 0, f"Found {duplicates} duplicate order_ids"
# Check 3: Valores positivos
assert (self.df['total_amount'] > 0).all(), "Found negative amounts"
# Check 4: Fechas válidas
assert (self.df['order_date'] <= datetime.now()).all(), "Found future dates"
print("✅ All validation checks passed")
return self
def load(self, output_path: str):
"""Guardar datos procesados"""
print(f"💾 Loading to {output_path}...")
self.df.to_parquet(
output_path,
index=False,
compression='snappy',
partition_cols=['order_year', 'order_month']
)
print(f"✅ Saved {len(self.df)} rows")
return self
def run(self, output_path: str):
"""Ejecutar pipeline completo"""
print(f"\n{'='*60}")
print(f"🚀 Starting Pandas ETL Pipeline")
print(f"{'='*60}\n")
self.extract() \
.clean() \
.transform() \
.validate() \
.load(output_path)
print(f"\n{'='*60}")
print(f"✅ Pipeline completed successfully")
print(f"{'='*60}\n")
# Uso
pipeline = DataPipeline('raw_sales.csv')
pipeline.run('processed_sales.parquet')
NumPy - Operaciones Numéricas:
import numpy as np
# ===== ARRAYS =====
# Crear arrays
arr = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
# Arrays especiales
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
identity = np.eye(3)
random = np.random.rand(3, 3)
# Range
arr = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
arr = np.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1]
# ===== OPERACIONES =====
# Elementwise
arr = np.array([1, 2, 3])
arr * 2 # [2, 4, 6]
arr + arr # [2, 4, 6]
arr ** 2 # [1, 4, 9]
# Agregaciones
arr.sum()
arr.mean()
arr.std()
arr.max()
arr.min()
# Axis-wise operations
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_2d.sum(axis=0) # [5, 7, 9] - columnas
arr_2d.sum(axis=1) # [6, 15] - filas
# ===== INDEXING Y SLICING =====
arr = np.array([1, 2, 3, 4, 5])
arr[0] # 1
arr[-1] # 5
arr[1:4] # [2, 3, 4]
arr[arr > 2] # [3, 4, 5] - boolean indexing
# ===== BROADCASTING =====
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_1d = np.array([10, 20, 30])
result = arr_2d + arr_1d # Broadcasting
# [[11, 22, 33],
# [14, 25, 36]]
# ===== CASO DE USO: Procesamiento de métricas =====
def calculate_moving_average(data, window):
"""Calcular moving average eficientemente con NumPy"""
weights = np.ones(window) / window
return np.convolve(data, weights, mode='valid')
# Datos de ventas diarias
sales = np.random.randint(100, 500, size=365)
# Moving average de 7 días
ma_7 = calculate_moving_average(sales, 7)
# Detectar outliers
mean = sales.mean()
std = sales.std()
outliers = sales[(sales < mean - 2*std) | (sales > mean + 2*std)]
Receta 5.8: Data Quality - Validación y Limpieza
Great Expectations - Framework de validación:
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
import pandas as pd
# ===== SETUP =====
# Inicializar contexto
context = gx.get_context()
# Crear expectation suite
suite = context.create_expectation_suite(
expectation_suite_name="sales_data_quality",
overwrite_existing=True
)
# ===== EXPECTATIVAS =====
# Tabla level
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1,
"max_value": 1000000
}
)
)
# Columna existe
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "order_id"}
)
)
# No nulls
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
)
)
# Valores únicos
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
)
)
# Valores en set
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "completed", "cancelled"]
}
)
)
# Rango numérico
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "total_amount",
"min_value": 0,
"max_value": 100000
}
)
)
# Regex pattern
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_match_regex",
kwargs={
"column": "email",
"regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
}
)
)
# ===== VALIDAR DATOS =====
# Cargar datos
df = pd.read_csv("sales.csv")
# Crear batch
batch = context.sources.add_pandas("sales_source").add_dataframe_asset(
name="sales_data"
).build_batch_request(dataframe=df)
# Validar
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
run_id="manual_run_20240101"
)
# Revisar resultados
if results["success"]:
print("✅ All validations passed!")
else:
print("❌ Validations failed:")
for result in results["run_results"]:
print(result)
Data Profiling con pandas-profiling:
from ydata_profiling import ProfileReport
import pandas as pd
# Cargar datos
df = pd.read_csv("sales.csv")
# Generar reporte
profile = ProfileReport(
df,
title="Sales Data Profile Report",
explorative=True,
minimal=False
)
# Guardar reporte HTML
profile.to_file("sales_profile_report.html")
# Ver estadísticas inline
print(profile.description_set)
# Identificar alertas automáticamente
print(profile.get_description()['alerts'])
Detección y manejo de outliers:
import pandas as pd
import numpy as np
from scipy import stats
class OutlierDetector:
"""Detectar y manejar outliers"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.outliers = {}
def detect_iqr(self, column: str, threshold=1.5):
"""Método IQR (Interquartile Range)"""
Q1 = self.df[column].quantile(0.25)
Q3 = self.df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
outliers = self.df[
(self.df[column] < lower_bound) |
(self.df[column] > upper_bound)
]
self.outliers[column] = {
'method': 'IQR',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound,
'indices': outliers.index.tolist()
}
return outliers
def detect_zscore(self, column: str, threshold=3):
"""Método Z-score"""
z_scores = np.abs(stats.zscore(self.df[column]))
outliers = self.df[z_scores > threshold]
self.outliers[column] = {
'method': 'Z-score',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'threshold': threshold,
'indices': outliers.index.tolist()
}
return outliers
def detect_isolation_forest(self, columns: list):
"""Método Isolation Forest (ML-based)"""
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(
contamination=0.1, # 10% esperado de outliers
random_state=42
)
predictions = iso_forest.fit_predict(self.df[columns])
outliers = self.df[predictions == -1]
self.outliers['multivariate'] = {
'method': 'Isolation Forest',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'indices': outliers.index.tolist()
}
return outliers
def remove_outliers(self, column: str, method='iqr'):
"""Remover outliers"""
if method == 'iqr':
outlier_indices = self.outliers[column]['indices']
return self.df.drop(outlier_indices)
def cap_outliers(self, column: str):
"""Cap outliers (winsorization)"""
info = self.outliers[column]
self.df[column] = self.df[column].clip(
lower=info['lower_bound'],
upper=info['upper_bound']
)
return self.df
def summary(self):
"""Resumen de outliers detectados"""
for col, info in self.outliers.items():
print(f"\n=== {col} ===")
print(f"Method: {info['method']}")
print(f"Count: {info['count']}")
print(f"Percentage: {info['percentage']:.2f}%")
# Uso
df = pd.read_csv("sales.csv")
detector = OutlierDetector(df)
# Detectar con diferentes métodos
detector.detect_iqr('total_amount')
detector.detect_zscore('total_amount')
# Ver resumen
detector.summary()
# Remover outliers
df_clean = detector.remove_outliers('total_amount', method='iqr')
Data Cleaning Pipeline completo:
import pandas as pd
import numpy as np
from datetime import datetime
class DataCleaner:
"""Pipeline de limpieza de datos completo"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.cleaning_log = []
def log(self, message: str):
"""Log de operaciones"""
self.cleaning_log.append({
'timestamp': datetime.now(),
'message': message
})
print(f" {message}")
def handle_missing_values(self, strategies: dict):
"""
Manejar valores faltantes
strategies = {
'column1': 'drop',
'column2': 'mean',
'column3': 'mode',
'column4': 'forward_fill',
'column5': 0
}
"""
print("\n🧹 Handling missing values...")
for col, strategy in strategies.items():
if col not in self.df.columns:
continue
missing_count = self.df[col].isnull().sum()
if missing_count == 0:
continue
if strategy == 'drop':
before = len(self.df)
self.df = self.df.dropna(subset=[col])
self.log(f"Dropped {before - len(self.df)} rows with null {col}")
elif strategy == 'mean':
mean_val = self.df[col].mean()
self.df[col].fillna(mean_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with mean ({mean_val:.2f})")
elif strategy == 'median':
median_val = self.df[col].median()
self.df[col].fillna(median_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with median ({median_val:.2f})")
elif strategy == 'mode':
mode_val = self.df[col].mode()[0]
self.df[col].fillna(mode_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with mode ({mode_val})")
elif strategy == 'forward_fill':
self.df[col].fillna(method='ffill', inplace=True)
self.log(f"Forward filled {missing_count} nulls in {col}")
else:
# Valor constante
self.df[col].fillna(strategy, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with {strategy}")
return self
def remove_duplicates(self, subset=None, keep='first'):
"""Remover duplicados"""
print("\n🔍 Removing duplicates...")
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} duplicate rows")
else:
self.log("No duplicates found")
return self
def standardize_text(self, columns: list):
"""Estandarizar columnas de texto"""
print("\n✏️ Standardizing text...")
for col in columns:
if col not in self.df.columns:
continue
# Strip whitespace
self.df[col] = self.df[col].str.strip()
# Lowercase
self.df[col] = self.df[col].str.lower()
# Remove extra spaces
self.df[col] = self.df[col].str.replace(r'\s+', ' ', regex=True)
self.log(f"Standardized text in {col}")
return self
def fix_data_types(self, type_map: dict):
"""Corregir tipos de datos"""
print("\n🔧 Fixing data types...")
for col, dtype in type_map.items():
if col not in self.df.columns:
continue
try:
if dtype == 'datetime':
self.df[col] = pd.to_datetime(self.df[col])
else:
self.df[col] = self.df[col].astype(dtype)
self.log(f"Converted {col} to {dtype}")
except Exception as e:
self.log(f"Failed to convert {col} to {dtype}: {str(e)}")
return self
def remove_outliers(self, columns: list, method='iqr', threshold=1.5):
"""Remover outliers"""
print(f"\n📊 Removing outliers using {method} method...")
for col in columns:
if col not in self.df.columns:
continue
before = len(self.df)
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
self.df = self.df[
(self.df[col] >= lower) &
(self.df[col] <= upper)
]
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} outliers from {col}")
return self
def validate_ranges(self, validations: dict):
"""Validar rangos de valores"""
print("\n✓ Validating value ranges...")
for col, (min_val, max_val) in validations.items():
if col not in self.df.columns:
continue
before = len(self.df)
self.df = self.df[
(self.df[col] >= min_val) &
(self.df[col] <= max_val)
]
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} rows with {col} outside [{min_val}, {max_val}]")
return self
def get_cleaned_data(self):
"""Retornar datos limpios"""
return self.df
def get_cleaning_report(self):
"""Generar reporte de limpieza"""
return pd.DataFrame(self.cleaning_log)
# ===== USO =====
# Cargar datos
df = pd.read_csv("raw_sales.csv")
# Pipeline de limpieza
cleaner = DataCleaner(df)
df_clean = cleaner \
.handle_missing_values({
'customer_id': 'drop',
'total_amount': 'median',
'discount': 0,
'category': 'mode'
}) \
.remove_duplicates(subset=['order_id']) \
.standardize_text(['customer_email', 'customer_name']) \
.fix_data_types({
'order_date': 'datetime',
'total_amount': 'float64',
'quantity': 'int32'
}) \
.validate_ranges({
'total_amount': (0, 100000),
'quantity': (1, 1000)
}) \
.remove_outliers(['total_amount'], method='iqr') \
.get_cleaned_data()
# Ver reporte
report = cleaner.get_cleaning_report()
print("\n" + "="*60)
print("CLEANING REPORT")
print("="*60)
print(report)
# Guardar datos limpios
df_clean.to_parquet('clean_sales.parquet', index=False)
¡Felicidades! 🎉
Has completado la FASE 5: Ingeniería de Datos del roadmap.
Lo que has aprendido:
✅ ETL vs ELT - Cuándo usar cada enfoque
✅ Apache Airflow - Orquestación de pipelines
✅ Apache Spark - Procesamiento distribuido
✅ Data Warehousing vs Data Lakes
✅ Medallion Architecture (Bronze/Silver/Gold)
✅ Stream Processing con Kafka
✅ SQL Avanzado - Window Functions y CTEs
✅ Pandas y NumPy para Data Engineering
✅ Data Quality y validación
✅ Data Cleaning pipelines
Próximos pasos:
FASE 6: Inteligencia Artificial y Colaboración con IA
- Fundamentos de IA/ML
- Ingeniería de Prompts
- Integración de IA en aplicaciones
- RAG y Vector Databases
Versión: 1.0
Fecha: 2024
Autor: Roadmap del Desarrollador del Futuro
Licencia: Uso educativo