📊 Developer Cookbook - FASE 5: SQL y Python para Data Engineering
Recetas prácticas para consultas avanzadas y procesamiento de datos con Python
📚 Tabla de Contenidos
Receta 5.6: SQL Avanzado - Window Functions y CTEs
Window Functions para Analytics:
-- ===== RANKING =====
-- ROW_NUMBER: Numeración única sin empates
SELECT
customer_id,
order_date,
total_amount,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date DESC
) as order_number
FROM orders;
-- RANK: Con empates (gaps en numeración)
SELECT
product_name,
category,
revenue,
RANK() OVER (
PARTITION BY category
ORDER BY revenue DESC
) as revenue_rank
FROM products;
-- DENSE_RANK: Con empates (sin gaps)
SELECT
salesperson,
sales_amount,
DENSE_RANK() OVER (
ORDER BY sales_amount DESC
) as rank
FROM sales;
-- ===== DISTRIBUCIÓN =====
-- NTILE: Dividir en N grupos
SELECT
customer_id,
total_revenue,
NTILE(4) OVER (
ORDER BY total_revenue DESC
) as quartile
FROM customer_summary;
-- PERCENT_RANK: Percentil relativo
SELECT
product_id,
price,
PERCENT_RANK() OVER (
ORDER BY price
) as price_percentile
FROM products;
-- ===== AGREGACIONES CON VENTANAS =====
-- Running total
SELECT
order_date,
daily_revenue,
SUM(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_revenue
FROM daily_sales;
-- Moving average (últimos 7 días)
SELECT
sale_date,
revenue,
AVG(revenue) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7day
FROM daily_revenue;
-- Year-over-Year comparison
SELECT
sale_date,
revenue,
LAG(revenue, 365) OVER (ORDER BY sale_date) as revenue_last_year,
revenue - LAG(revenue, 365) OVER (ORDER BY sale_date) as yoy_change,
ROUND(
(revenue - LAG(revenue, 365) OVER (ORDER BY sale_date)) /
LAG(revenue, 365) OVER (ORDER BY sale_date) * 100,
2
) as yoy_change_pct
FROM daily_revenue;
-- ===== LEAD y LAG =====
-- Siguiente y anterior valor
SELECT
customer_id,
order_date,
total_amount,
LAG(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as previous_order_date,
LEAD(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as next_order_date,
order_date - LAG(order_date) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as days_since_last_order
FROM orders;
-- ===== FIRST_VALUE y LAST_VALUE =====
-- Comparar con primer y último valor de la ventana
SELECT
product_id,
sale_date,
price,
FIRST_VALUE(price) OVER (
PARTITION BY product_id
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as initial_price,
LAST_VALUE(price) OVER (
PARTITION BY product_id
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as current_price
FROM product_prices;
CTEs (Common Table Expressions):
-- ===== CTE SIMPLE =====
-- Calcular métricas intermedias
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
MAX(order_date) as last_order_date
FROM orders
GROUP BY customer_id
)
SELECT
c.customer_name,
c.email,
cm.total_orders,
cm.total_revenue,
cm.avg_order_value,
CURRENT_DATE - cm.last_order_date as days_since_last_order
FROM customers c
JOIN customer_metrics cm ON c.id = cm.customer_id
WHERE cm.total_revenue > 1000
ORDER BY cm.total_revenue DESC;
-- ===== MÚLTIPLES CTEs =====
WITH
-- CTE 1: Ventas mensuales
monthly_sales AS (
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as revenue,
COUNT(*) as order_count
FROM orders
GROUP BY DATE_TRUNC('month', order_date)
),
-- CTE 2: Agregar métricas derivadas
sales_with_metrics AS (
SELECT
month,
revenue,
order_count,
revenue / order_count as avg_order_value,
LAG(revenue) OVER (ORDER BY month) as prev_month_revenue
FROM monthly_sales
),
-- CTE 3: Calcular growth
sales_with_growth AS (
SELECT
month,
revenue,
prev_month_revenue,
CASE
WHEN prev_month_revenue IS NOT NULL THEN
ROUND((revenue - prev_month_revenue) / prev_month_revenue * 100, 2)
ELSE NULL
END as mom_growth_pct
FROM sales_with_metrics
)
SELECT *
FROM sales_with_growth
WHERE mom_growth_pct IS NOT NULL
ORDER BY month DESC;
-- ===== CTE RECURSIVO: Jerarquías =====
-- Organigrama de empleados
WITH RECURSIVE employee_hierarchy AS (
-- Caso base: CEO (sin manager)
SELECT
id,
name,
manager_id,
1 as level,
name::TEXT as path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Caso recursivo: empleados con managers
SELECT
e.id,
e.name,
e.manager_id,
eh.level + 1,
eh.path || ' > ' || e.name
FROM employees e
INNER JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
id,
name,
level,
path
FROM employee_hierarchy
ORDER BY level, name;
-- Series de fechas
WITH RECURSIVE date_series AS (
SELECT DATE '2024-01-01' as date
UNION ALL
SELECT date + INTERVAL '1 day'
FROM date_series
WHERE date < DATE '2024-12-31'
)
SELECT
ds.date,
COALESCE(SUM(o.total_amount), 0) as revenue
FROM date_series ds
LEFT JOIN orders o ON DATE(o.order_date) = ds.date
GROUP BY ds.date
ORDER BY ds.date;
Queries complejos para Data Engineering:
-- ===== COHORT ANALYSIS =====
-- Análisis de retención por cohorte
WITH first_purchase AS (
SELECT
customer_id,
MIN(DATE_TRUNC('month', order_date)) as cohort_month
FROM orders
GROUP BY customer_id
),
customer_orders AS (
SELECT
o.customer_id,
fp.cohort_month,
DATE_TRUNC('month', o.order_date) as order_month,
EXTRACT(YEAR FROM AGE(o.order_date, fp.cohort_month)) * 12 +
EXTRACT(MONTH FROM AGE(o.order_date, fp.cohort_month)) as months_since_first
FROM orders o
JOIN first_purchase fp ON o.customer_id = fp.customer_id
)
SELECT
cohort_month,
months_since_first,
COUNT(DISTINCT customer_id) as customers,
COUNT(*) as orders
FROM customer_orders
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;
-- ===== FUNNEL ANALYSIS =====
-- Embudo de conversión
WITH funnel_steps AS (
SELECT
user_id,
MAX(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as viewed,
MAX(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart,
MAX(CASE WHEN event_type = 'checkout' THEN 1 ELSE 0 END) as checked_out,
MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchased
FROM events
WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY user_id
)
SELECT
COUNT(*) as total_users,
SUM(viewed) as step1_viewed,
SUM(added_to_cart) as step2_add_to_cart,
SUM(checked_out) as step3_checkout,
SUM(purchased) as step4_purchase,
ROUND(SUM(added_to_cart)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as conversion_to_cart,
ROUND(SUM(purchased)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as overall_conversion
FROM funnel_steps;
-- ===== RFM ANALYSIS (Recency, Frequency, Monetary) =====
WITH customer_rfm AS (
SELECT
customer_id,
-- Recency: días desde última compra
CURRENT_DATE - MAX(order_date) as recency,
-- Frequency: número de órdenes
COUNT(*) as frequency,
-- Monetary: revenue total
SUM(total_amount) as monetary
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '1 year'
GROUP BY customer_id
),
rfm_scores AS (
SELECT
customer_id,
recency,
frequency,
monetary,
-- Scores de 1-5 basados en quintiles
NTILE(5) OVER (ORDER BY recency DESC) as r_score, -- Menor recency = mejor
NTILE(5) OVER (ORDER BY frequency ASC) as f_score,
NTILE(5) OVER (ORDER BY monetary ASC) as m_score
FROM customer_rfm
)
SELECT
customer_id,
CASE
WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champions'
WHEN r_score >= 3 AND f_score >= 3 THEN 'Loyal Customers'
WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customers'
WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
ELSE 'Others'
END as segment,
r_score,
f_score,
m_score
FROM rfm_scores;
Receta 5.7: Python para Data Engineering
Pandas - Manipulación de Datos:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ===== LEER DATOS =====
# CSV
df = pd.read_csv('sales.csv', parse_dates=['order_date'])
# Excel
df = pd.read_excel('data.xlsx', sheet_name='Sales')
# JSON
df = pd.read_json('data.json')
# SQL
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@host:5432/db')
df = pd.read_sql("SELECT * FROM orders", engine)
# Parquet
df = pd.read_parquet('data.parquet')
# ===== EXPLORACIÓN =====
# Info básica
print(df.head())
print(df.info())
print(df.describe())
# Valores únicos
print(df['category'].unique())
print(df['category'].value_counts())
# Valores nulos
print(df.isnull().sum())
print(df.isnull().sum() / len(df) * 100) # Porcentaje
# ===== LIMPIEZA =====
# Remover duplicados
df_clean = df.drop_duplicates(subset=['order_id'])
# Remover nulls
df_clean = df.dropna(subset=['customer_id', 'total_amount'])
# Rellenar nulls
df_filled = df.fillna({
'discount': 0,
'notes': 'No notes',
'category': 'Uncategorized'
})
# Forward fill (usar valor anterior)
df['price'] = df['price'].fillna(method='ffill')
# Backward fill
df['price'] = df['price'].fillna(method='bfill')
# Interpolar valores numéricos
df['sales'] = df['sales'].interpolate(method='linear')
# ===== TRANSFORMACIONES =====
# Crear nuevas columnas
df['revenue'] = df['quantity'] * df['price']
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['day_of_week'] = df['order_date'].dt.day_name()
# Apply function
def categorize_amount(amount):
if amount < 100:
return 'Low'
elif amount < 500:
return 'Medium'
else:
return 'High'
df['amount_category'] = df['total_amount'].apply(categorize_amount)
# Apply con lambda
df['customer_name_upper'] = df['customer_name'].apply(lambda x: x.upper())
# Map con diccionario
category_map = {'A': 'Premium', 'B': 'Standard', 'C': 'Basic'}
df['tier'] = df['category'].map(category_map)
# String operations
df['email'] = df['email'].str.lower().str.strip()
df['product_name'] = df['product_name'].str.title()
# ===== AGREGACIONES =====
# Group by simple
summary = df.groupby('customer_id').agg({
'order_id': 'count',
'total_amount': ['sum', 'mean', 'max'],
'order_date': 'max'
}).reset_index()
# Renombrar columnas después de agg
summary.columns = [
'customer_id',
'total_orders',
'total_revenue',
'avg_order_value',
'max_order_value',
'last_order_date'
]
# Group by múltiples columnas
monthly_summary = df.groupby([
df['order_date'].dt.to_period('M'),
'product_category'
]).agg({
'total_amount': 'sum',
'order_id': 'count'
}).reset_index()
# Custom aggregation function
def revenue_per_customer(group):
return group['total_amount'].sum() / group['customer_id'].nunique()
df.groupby('product_category').apply(revenue_per_customer)
# ===== PIVOT TABLES =====
# Pivot
pivot = df.pivot_table(
values='total_amount',
index='customer_id',
columns='product_category',
aggfunc='sum',
fill_value=0
)
# Multiple aggregations
pivot_multi = df.pivot_table(
values='total_amount',
index='order_date',
columns='product_category',
aggfunc=['sum', 'count', 'mean']
)
# ===== MERGE Y JOIN =====
# Inner join
df_merged = pd.merge(
df_orders,
df_customers,
left_on='customer_id',
right_on='id',
how='inner'
)
# Left join
df_all_orders = pd.merge(
df_orders,
df_customers,
left_on='customer_id',
right_on='id',
how='left'
)
# Multiple joins
df_complete = df_orders \
.merge(df_customers, left_on='customer_id', right_on='id') \
.merge(df_products, left_on='product_id', right_on='id')
# ===== WINDOW FUNCTIONS =====
# Ranking
df['revenue_rank'] = df.groupby('customer_id')['total_amount'].rank(
ascending=False,
method='dense'
)
# Cumulative sum
df['cumulative_revenue'] = df.groupby('customer_id')['total_amount'].cumsum()
# Moving average
df['moving_avg_3'] = df.groupby('customer_id')['total_amount'] \
.transform(lambda x: x.rolling(window=3, min_periods=1).mean())
# Shift (lag/lead)
df['prev_order_amount'] = df.groupby('customer_id')['total_amount'].shift(1)
df['next_order_amount'] = df.groupby('customer_id')['total_amount'].shift(-1)
# ===== TIME SERIES =====
# Resample (agregación temporal)
df.set_index('order_date', inplace=True)
# Daily to monthly
monthly = df.resample('M').agg({
'total_amount': 'sum',
'order_id': 'count'
})
# Rolling window
df['7day_avg'] = df['total_amount'].rolling(window=7).mean()
# ===== CASO DE USO COMPLETO =====
class DataPipeline:
"""Pipeline completo de procesamiento con Pandas"""
def __init__(self, input_path: str):
self.input_path = input_path
self.df = None
def extract(self):
"""Extraer datos"""
print("📥 Extracting data...")
self.df = pd.read_csv(self.input_path, parse_dates=['order_date'])
print(f"✅ Loaded {len(self.df)} rows")
return self
def clean(self):
"""Limpiar datos"""
print("🧹 Cleaning data...")
# Remover duplicados
before = len(self.df)
self.df = self.df.drop_duplicates(subset=['order_id'])
print(f" Removed {before - len(self.df)} duplicates")
# Remover nulls en columnas críticas
before = len(self.df)
self.df = self.df.dropna(subset=['customer_id', 'total_amount'])
print(f" Removed {before - len(self.df)} rows with nulls")
# Rellenar otros nulls
self.df['discount'] = self.df['discount'].fillna(0)
# Normalizar strings
self.df['customer_email'] = self.df['customer_email'].str.lower().str.strip()
# Filtrar valores inválidos
before = len(self.df)
self.df = self.df[self.df['total_amount'] > 0]
print(f" Removed {before - len(self.df)} invalid amounts")
print(f"✅ Clean dataset: {len(self.df)} rows")
return self
def transform(self):
"""Transformar y enriquecer datos"""
print("⚙️ Transforming data...")
# Columnas derivadas
self.df['revenue'] = self.df['quantity'] * self.df['price']
self.df['order_year'] = self.df['order_date'].dt.year
self.df['order_month'] = self.df['order_date'].dt.month
# Categorización
self.df['revenue_tier'] = pd.cut(
self.df['total_amount'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bronze', 'Silver', 'Gold', 'Platinum']
)
# Agregaciones por cliente
customer_metrics = self.df.groupby('customer_id').agg({
'order_id': 'count',
'total_amount': 'sum'
}).rename(columns={
'order_id': 'total_orders',
'total_amount': 'customer_ltv'
})
self.df = self.df.merge(customer_metrics, on='customer_id', how='left')
print("✅ Transformation complete")
return self
def validate(self):
"""Validar calidad de datos"""
print("✓ Validating data quality...")
# Check 1: No nulls en columnas críticas
critical_cols = ['order_id', 'customer_id', 'total_amount']
nulls = self.df[critical_cols].isnull().sum()
assert nulls.sum() == 0, f"Found nulls in critical columns: {nulls}"
# Check 2: No duplicados
duplicates = self.df.duplicated(subset=['order_id']).sum()
assert duplicates == 0, f"Found {duplicates} duplicate order_ids"
# Check 3: Valores positivos
assert (self.df['total_amount'] > 0).all(), "Found negative amounts"
# Check 4: Fechas válidas
assert (self.df['order_date'] <= datetime.now()).all(), "Found future dates"
print("✅ All validation checks passed")
return self
def load(self, output_path: str):
"""Guardar datos procesados"""
print(f"💾 Loading to {output_path}...")
self.df.to_parquet(
output_path,
index=False,
compression='snappy',
partition_cols=['order_year', 'order_month']
)
print(f"✅ Saved {len(self.df)} rows")
return self
def run(self, output_path: str):
"""Ejecutar pipeline completo"""
print(f"\n{'='*60}")
print(f"🚀 Starting Pandas ETL Pipeline")
print(f"{'='*60}\n")
self.extract() \
.clean() \
.transform() \
.validate() \
.load(output_path)
print(f"\n{'='*60}")
print(f"✅ Pipeline completed successfully")
print(f"{'='*60}\n")
# Uso
pipeline = DataPipeline('raw_sales.csv')
pipeline.run('processed_sales.parquet')
NumPy - Operaciones Numéricas:
import numpy as np
# ===== ARRAYS =====
# Crear arrays
arr = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
# Arrays especiales
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
identity = np.eye(3)
random = np.random.rand(3, 3)
# Range
arr = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
arr = np.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1]
# ===== OPERACIONES =====
# Elementwise
arr = np.array([1, 2, 3])
arr * 2 # [2, 4, 6]
arr + arr # [2, 4, 6]
arr ** 2 # [1, 4, 9]
# Agregaciones
arr.sum()
arr.mean()
arr.std()
arr.max()
arr.min()
# Axis-wise operations
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_2d.sum(axis=0) # [5, 7, 9] - columnas
arr_2d.sum(axis=1) # [6, 15] - filas
# ===== INDEXING Y SLICING =====
arr = np.array([1, 2, 3, 4, 5])
arr[0] # 1
arr[-1] # 5
arr[1:4] # [2, 3, 4]
arr[arr > 2] # [3, 4, 5] - boolean indexing
# ===== BROADCASTING =====
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_1d = np.array([10, 20, 30])
result = arr_2d + arr_1d # Broadcasting
# [[11, 22, 33],
# [14, 25, 36]]
# ===== CASO DE USO: Procesamiento de métricas =====
def calculate_moving_average(data, window):
"""Calcular moving average eficientemente con NumPy"""
weights = np.ones(window) / window
return np.convolve(data, weights, mode='valid')
# Datos de ventas diarias
sales = np.random.randint(100, 500, size=365)
# Moving average de 7 días
ma_7 = calculate_moving_average(sales, 7)
# Detectar outliers
mean = sales.mean()
std = sales.std()
outliers = sales[(sales < mean - 2*std) | (sales > mean + 2*std)]