📊 Developer Cookbook - FASE 5: SQL y Python para Data Engineering

Recetas prácticas para consultas avanzadas y procesamiento de datos con Python


📚 Tabla de Contenidos

  1. Receta 5.6: SQL Avanzado - Window Functions y CTEs
  2. Receta 5.7: Python para Data Engineering

Receta 5.6: SQL Avanzado - Window Functions y CTEs

Window Functions para Analytics:

-- ===== RANKING =====

-- ROW_NUMBER: Numeración única sin empates
SELECT
    customer_id,
    order_date,
    total_amount,
    ROW_NUMBER() OVER (
        PARTITION BY customer_id 
        ORDER BY order_date DESC
    ) as order_number
FROM orders;

-- RANK: Con empates (gaps en numeración)
SELECT
    product_name,
    category,
    revenue,
    RANK() OVER (
        PARTITION BY category 
        ORDER BY revenue DESC
    ) as revenue_rank
FROM products;

-- DENSE_RANK: Con empates (sin gaps)
SELECT
    salesperson,
    sales_amount,
    DENSE_RANK() OVER (
        ORDER BY sales_amount DESC
    ) as rank
FROM sales;

-- ===== DISTRIBUCIÓN =====

-- NTILE: Dividir en N grupos
SELECT
    customer_id,
    total_revenue,
    NTILE(4) OVER (
        ORDER BY total_revenue DESC
    ) as quartile
FROM customer_summary;

-- PERCENT_RANK: Percentil relativo
SELECT
    product_id,
    price,
    PERCENT_RANK() OVER (
        ORDER BY price
    ) as price_percentile
FROM products;

-- ===== AGREGACIONES CON VENTANAS =====

-- Running total
SELECT
    order_date,
    daily_revenue,
    SUM(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_revenue
FROM daily_sales;

-- Moving average (últimos 7 días)
SELECT
    sale_date,
    revenue,
    AVG(revenue) OVER (
        ORDER BY sale_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7day
FROM daily_revenue;

-- Year-over-Year comparison
SELECT
    sale_date,
    revenue,
    LAG(revenue, 365) OVER (ORDER BY sale_date) as revenue_last_year,
    revenue - LAG(revenue, 365) OVER (ORDER BY sale_date) as yoy_change,
    ROUND(
        (revenue - LAG(revenue, 365) OVER (ORDER BY sale_date)) / 
        LAG(revenue, 365) OVER (ORDER BY sale_date) * 100,
        2
    ) as yoy_change_pct
FROM daily_revenue;

-- ===== LEAD y LAG =====

-- Siguiente y anterior valor
SELECT
    customer_id,
    order_date,
    total_amount,
    LAG(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as previous_order_date,
    LEAD(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as next_order_date,
    order_date - LAG(order_date) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as days_since_last_order
FROM orders;

-- ===== FIRST_VALUE y LAST_VALUE =====

-- Comparar con primer y último valor de la ventana
SELECT
    product_id,
    sale_date,
    price,
    FIRST_VALUE(price) OVER (
        PARTITION BY product_id 
        ORDER BY sale_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as initial_price,
    LAST_VALUE(price) OVER (
        PARTITION BY product_id 
        ORDER BY sale_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as current_price
FROM product_prices;

CTEs (Common Table Expressions):

-- ===== CTE SIMPLE =====

-- Calcular métricas intermedias
WITH customer_metrics AS (
    SELECT
        customer_id,
        COUNT(*) as total_orders,
        SUM(total_amount) as total_revenue,
        AVG(total_amount) as avg_order_value,
        MAX(order_date) as last_order_date
    FROM orders
    GROUP BY customer_id
)
SELECT
    c.customer_name,
    c.email,
    cm.total_orders,
    cm.total_revenue,
    cm.avg_order_value,
    CURRENT_DATE - cm.last_order_date as days_since_last_order
FROM customers c
JOIN customer_metrics cm ON c.id = cm.customer_id
WHERE cm.total_revenue > 1000
ORDER BY cm.total_revenue DESC;

-- ===== MÚLTIPLES CTEs =====

WITH 
-- CTE 1: Ventas mensuales
monthly_sales AS (
    SELECT
        DATE_TRUNC('month', order_date) as month,
        SUM(total_amount) as revenue,
        COUNT(*) as order_count
    FROM orders
    GROUP BY DATE_TRUNC('month', order_date)
),
-- CTE 2: Agregar métricas derivadas
sales_with_metrics AS (
    SELECT
        month,
        revenue,
        order_count,
        revenue / order_count as avg_order_value,
        LAG(revenue) OVER (ORDER BY month) as prev_month_revenue
    FROM monthly_sales
),
-- CTE 3: Calcular growth
sales_with_growth AS (
    SELECT
        month,
        revenue,
        prev_month_revenue,
        CASE 
            WHEN prev_month_revenue IS NOT NULL THEN
                ROUND((revenue - prev_month_revenue) / prev_month_revenue * 100, 2)
            ELSE NULL
        END as mom_growth_pct
    FROM sales_with_metrics
)
SELECT * 
FROM sales_with_growth
WHERE mom_growth_pct IS NOT NULL
ORDER BY month DESC;

-- ===== CTE RECURSIVO: Jerarquías =====

-- Organigrama de empleados
WITH RECURSIVE employee_hierarchy AS (
    -- Caso base: CEO (sin manager)
    SELECT
        id,
        name,
        manager_id,
        1 as level,
        name::TEXT as path
    FROM employees
    WHERE manager_id IS NULL
    
    UNION ALL
    
    -- Caso recursivo: empleados con managers
    SELECT
        e.id,
        e.name,
        e.manager_id,
        eh.level + 1,
        eh.path || ' > ' || e.name
    FROM employees e
    INNER JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT
    id,
    name,
    level,
    path
FROM employee_hierarchy
ORDER BY level, name;

-- Series de fechas
WITH RECURSIVE date_series AS (
    SELECT DATE '2024-01-01' as date
    UNION ALL
    SELECT date + INTERVAL '1 day'
    FROM date_series
    WHERE date < DATE '2024-12-31'
)
SELECT 
    ds.date,
    COALESCE(SUM(o.total_amount), 0) as revenue
FROM date_series ds
LEFT JOIN orders o ON DATE(o.order_date) = ds.date
GROUP BY ds.date
ORDER BY ds.date;

Queries complejos para Data Engineering:

-- ===== COHORT ANALYSIS =====

-- Análisis de retención por cohorte
WITH first_purchase AS (
    SELECT
        customer_id,
        MIN(DATE_TRUNC('month', order_date)) as cohort_month
    FROM orders
    GROUP BY customer_id
),
customer_orders AS (
    SELECT
        o.customer_id,
        fp.cohort_month,
        DATE_TRUNC('month', o.order_date) as order_month,
        EXTRACT(YEAR FROM AGE(o.order_date, fp.cohort_month)) * 12 +
        EXTRACT(MONTH FROM AGE(o.order_date, fp.cohort_month)) as months_since_first
    FROM orders o
    JOIN first_purchase fp ON o.customer_id = fp.customer_id
)
SELECT
    cohort_month,
    months_since_first,
    COUNT(DISTINCT customer_id) as customers,
    COUNT(*) as orders
FROM customer_orders
GROUP BY cohort_month, months_since_first
ORDER BY cohort_month, months_since_first;

-- ===== FUNNEL ANALYSIS =====

-- Embudo de conversión
WITH funnel_steps AS (
    SELECT
        user_id,
        MAX(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as viewed,
        MAX(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as added_to_cart,
        MAX(CASE WHEN event_type = 'checkout' THEN 1 ELSE 0 END) as checked_out,
        MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchased
    FROM events
    WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY user_id
)
SELECT
    COUNT(*) as total_users,
    SUM(viewed) as step1_viewed,
    SUM(added_to_cart) as step2_add_to_cart,
    SUM(checked_out) as step3_checkout,
    SUM(purchased) as step4_purchase,
    ROUND(SUM(added_to_cart)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as conversion_to_cart,
    ROUND(SUM(purchased)::NUMERIC / NULLIF(SUM(viewed), 0) * 100, 2) as overall_conversion
FROM funnel_steps;

-- ===== RFM ANALYSIS (Recency, Frequency, Monetary) =====

WITH customer_rfm AS (
    SELECT
        customer_id,
        -- Recency: días desde última compra
        CURRENT_DATE - MAX(order_date) as recency,
        -- Frequency: número de órdenes
        COUNT(*) as frequency,
        -- Monetary: revenue total
        SUM(total_amount) as monetary
    FROM orders
    WHERE order_date >= CURRENT_DATE - INTERVAL '1 year'
    GROUP BY customer_id
),
rfm_scores AS (
    SELECT
        customer_id,
        recency,
        frequency,
        monetary,
        -- Scores de 1-5 basados en quintiles
        NTILE(5) OVER (ORDER BY recency DESC) as r_score,  -- Menor recency = mejor
        NTILE(5) OVER (ORDER BY frequency ASC) as f_score,
        NTILE(5) OVER (ORDER BY monetary ASC) as m_score
    FROM customer_rfm
)
SELECT
    customer_id,
    CASE
        WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champions'
        WHEN r_score >= 3 AND f_score >= 3 THEN 'Loyal Customers'
        WHEN r_score >= 4 AND f_score <= 2 THEN 'New Customers'
        WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
        WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
        ELSE 'Others'
    END as segment,
    r_score,
    f_score,
    m_score
FROM rfm_scores;

Receta 5.7: Python para Data Engineering

Pandas - Manipulación de Datos:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ===== LEER DATOS =====

# CSV
df = pd.read_csv('sales.csv', parse_dates=['order_date'])

# Excel
df = pd.read_excel('data.xlsx', sheet_name='Sales')

# JSON
df = pd.read_json('data.json')

# SQL
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@host:5432/db')
df = pd.read_sql("SELECT * FROM orders", engine)

# Parquet
df = pd.read_parquet('data.parquet')

# ===== EXPLORACIÓN =====

# Info básica
print(df.head())
print(df.info())
print(df.describe())

# Valores únicos
print(df['category'].unique())
print(df['category'].value_counts())

# Valores nulos
print(df.isnull().sum())
print(df.isnull().sum() / len(df) * 100)  # Porcentaje

# ===== LIMPIEZA =====

# Remover duplicados
df_clean = df.drop_duplicates(subset=['order_id'])

# Remover nulls
df_clean = df.dropna(subset=['customer_id', 'total_amount'])

# Rellenar nulls
df_filled = df.fillna({
    'discount': 0,
    'notes': 'No notes',
    'category': 'Uncategorized'
})

# Forward fill (usar valor anterior)
df['price'] = df['price'].fillna(method='ffill')

# Backward fill
df['price'] = df['price'].fillna(method='bfill')

# Interpolar valores numéricos
df['sales'] = df['sales'].interpolate(method='linear')

# ===== TRANSFORMACIONES =====

# Crear nuevas columnas
df['revenue'] = df['quantity'] * df['price']
df['order_year'] = df['order_date'].dt.year
df['order_month'] = df['order_date'].dt.month
df['day_of_week'] = df['order_date'].dt.day_name()

# Apply function
def categorize_amount(amount):
    if amount < 100:
        return 'Low'
    elif amount < 500:
        return 'Medium'
    else:
        return 'High'

df['amount_category'] = df['total_amount'].apply(categorize_amount)

# Apply con lambda
df['customer_name_upper'] = df['customer_name'].apply(lambda x: x.upper())

# Map con diccionario
category_map = {'A': 'Premium', 'B': 'Standard', 'C': 'Basic'}
df['tier'] = df['category'].map(category_map)

# String operations
df['email'] = df['email'].str.lower().str.strip()
df['product_name'] = df['product_name'].str.title()

# ===== AGREGACIONES =====

# Group by simple
summary = df.groupby('customer_id').agg({
    'order_id': 'count',
    'total_amount': ['sum', 'mean', 'max'],
    'order_date': 'max'
}).reset_index()

# Renombrar columnas después de agg
summary.columns = [
    'customer_id',
    'total_orders',
    'total_revenue',
    'avg_order_value',
    'max_order_value',
    'last_order_date'
]

# Group by múltiples columnas
monthly_summary = df.groupby([
    df['order_date'].dt.to_period('M'),
    'product_category'
]).agg({
    'total_amount': 'sum',
    'order_id': 'count'
}).reset_index()

# Custom aggregation function
def revenue_per_customer(group):
    return group['total_amount'].sum() / group['customer_id'].nunique()

df.groupby('product_category').apply(revenue_per_customer)

# ===== PIVOT TABLES =====

# Pivot
pivot = df.pivot_table(
    values='total_amount',
    index='customer_id',
    columns='product_category',
    aggfunc='sum',
    fill_value=0
)

# Multiple aggregations
pivot_multi = df.pivot_table(
    values='total_amount',
    index='order_date',
    columns='product_category',
    aggfunc=['sum', 'count', 'mean']
)

# ===== MERGE Y JOIN =====

# Inner join
df_merged = pd.merge(
    df_orders,
    df_customers,
    left_on='customer_id',
    right_on='id',
    how='inner'
)

# Left join
df_all_orders = pd.merge(
    df_orders,
    df_customers,
    left_on='customer_id',
    right_on='id',
    how='left'
)

# Multiple joins
df_complete = df_orders \
    .merge(df_customers, left_on='customer_id', right_on='id') \
    .merge(df_products, left_on='product_id', right_on='id')

# ===== WINDOW FUNCTIONS =====

# Ranking
df['revenue_rank'] = df.groupby('customer_id')['total_amount'].rank(
    ascending=False,
    method='dense'
)

# Cumulative sum
df['cumulative_revenue'] = df.groupby('customer_id')['total_amount'].cumsum()

# Moving average
df['moving_avg_3'] = df.groupby('customer_id')['total_amount'] \
    .transform(lambda x: x.rolling(window=3, min_periods=1).mean())

# Shift (lag/lead)
df['prev_order_amount'] = df.groupby('customer_id')['total_amount'].shift(1)
df['next_order_amount'] = df.groupby('customer_id')['total_amount'].shift(-1)

# ===== TIME SERIES =====

# Resample (agregación temporal)
df.set_index('order_date', inplace=True)

# Daily to monthly
monthly = df.resample('M').agg({
    'total_amount': 'sum',
    'order_id': 'count'
})

# Rolling window
df['7day_avg'] = df['total_amount'].rolling(window=7).mean()

# ===== CASO DE USO COMPLETO =====

class DataPipeline:
    """Pipeline completo de procesamiento con Pandas"""
    
    def __init__(self, input_path: str):
        self.input_path = input_path
        self.df = None
    
    def extract(self):
        """Extraer datos"""
        print("📥 Extracting data...")
        self.df = pd.read_csv(self.input_path, parse_dates=['order_date'])
        print(f"✅ Loaded {len(self.df)} rows")
        return self
    
    def clean(self):
        """Limpiar datos"""
        print("🧹 Cleaning data...")
        
        # Remover duplicados
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=['order_id'])
        print(f"  Removed {before - len(self.df)} duplicates")
        
        # Remover nulls en columnas críticas
        before = len(self.df)
        self.df = self.df.dropna(subset=['customer_id', 'total_amount'])
        print(f"  Removed {before - len(self.df)} rows with nulls")
        
        # Rellenar otros nulls
        self.df['discount'] = self.df['discount'].fillna(0)
        
        # Normalizar strings
        self.df['customer_email'] = self.df['customer_email'].str.lower().str.strip()
        
        # Filtrar valores inválidos
        before = len(self.df)
        self.df = self.df[self.df['total_amount'] > 0]
        print(f"  Removed {before - len(self.df)} invalid amounts")
        
        print(f"✅ Clean dataset: {len(self.df)} rows")
        return self
    
    def transform(self):
        """Transformar y enriquecer datos"""
        print("⚙️ Transforming data...")
        
        # Columnas derivadas
        self.df['revenue'] = self.df['quantity'] * self.df['price']
        self.df['order_year'] = self.df['order_date'].dt.year
        self.df['order_month'] = self.df['order_date'].dt.month
        
        # Categorización
        self.df['revenue_tier'] = pd.cut(
            self.df['total_amount'],
            bins=[0, 100, 500, 1000, float('inf')],
            labels=['Bronze', 'Silver', 'Gold', 'Platinum']
        )
        
        # Agregaciones por cliente
        customer_metrics = self.df.groupby('customer_id').agg({
            'order_id': 'count',
            'total_amount': 'sum'
        }).rename(columns={
            'order_id': 'total_orders',
            'total_amount': 'customer_ltv'
        })
        
        self.df = self.df.merge(customer_metrics, on='customer_id', how='left')
        
        print("✅ Transformation complete")
        return self
    
    def validate(self):
        """Validar calidad de datos"""
        print("✓ Validating data quality...")
        
        # Check 1: No nulls en columnas críticas
        critical_cols = ['order_id', 'customer_id', 'total_amount']
        nulls = self.df[critical_cols].isnull().sum()
        assert nulls.sum() == 0, f"Found nulls in critical columns: {nulls}"
        
        # Check 2: No duplicados
        duplicates = self.df.duplicated(subset=['order_id']).sum()
        assert duplicates == 0, f"Found {duplicates} duplicate order_ids"
        
        # Check 3: Valores positivos
        assert (self.df['total_amount'] > 0).all(), "Found negative amounts"
        
        # Check 4: Fechas válidas
        assert (self.df['order_date'] <= datetime.now()).all(), "Found future dates"
        
        print("✅ All validation checks passed")
        return self
    
    def load(self, output_path: str):
        """Guardar datos procesados"""
        print(f"💾 Loading to {output_path}...")
        
        self.df.to_parquet(
            output_path,
            index=False,
            compression='snappy',
            partition_cols=['order_year', 'order_month']
        )
        
        print(f"✅ Saved {len(self.df)} rows")
        return self
    
    def run(self, output_path: str):
        """Ejecutar pipeline completo"""
        print(f"\n{'='*60}")
        print(f"🚀 Starting Pandas ETL Pipeline")
        print(f"{'='*60}\n")
        
        self.extract() \
            .clean() \
            .transform() \
            .validate() \
            .load(output_path)
        
        print(f"\n{'='*60}")
        print(f"✅ Pipeline completed successfully")
        print(f"{'='*60}\n")

# Uso
pipeline = DataPipeline('raw_sales.csv')
pipeline.run('processed_sales.parquet')

NumPy - Operaciones Numéricas:

import numpy as np

# ===== ARRAYS =====

# Crear arrays
arr = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])

# Arrays especiales
zeros = np.zeros((3, 4))
ones = np.ones((2, 3))
identity = np.eye(3)
random = np.random.rand(3, 3)

# Range
arr = np.arange(0, 10, 2)  # [0, 2, 4, 6, 8]
arr = np.linspace(0, 1, 5)  # [0, 0.25, 0.5, 0.75, 1]

# ===== OPERACIONES =====

# Elementwise
arr = np.array([1, 2, 3])
arr * 2  # [2, 4, 6]
arr + arr  # [2, 4, 6]
arr ** 2  # [1, 4, 9]

# Agregaciones
arr.sum()
arr.mean()
arr.std()
arr.max()
arr.min()

# Axis-wise operations
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_2d.sum(axis=0)  # [5, 7, 9] - columnas
arr_2d.sum(axis=1)  # [6, 15] - filas

# ===== INDEXING Y SLICING =====

arr = np.array([1, 2, 3, 4, 5])
arr[0]  # 1
arr[-1]  # 5
arr[1:4]  # [2, 3, 4]
arr[arr > 2]  # [3, 4, 5] - boolean indexing

# ===== BROADCASTING =====

arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
arr_1d = np.array([10, 20, 30])

result = arr_2d + arr_1d  # Broadcasting
# [[11, 22, 33],
#  [14, 25, 36]]

# ===== CASO DE USO: Procesamiento de métricas =====

def calculate_moving_average(data, window):
    """Calcular moving average eficientemente con NumPy"""
    weights = np.ones(window) / window
    return np.convolve(data, weights, mode='valid')

# Datos de ventas diarias
sales = np.random.randint(100, 500, size=365)

# Moving average de 7 días
ma_7 = calculate_moving_average(sales, 7)

# Detectar outliers
mean = sales.mean()
std = sales.std()
outliers = sales[(sales < mean - 2*std) | (sales > mean + 2*std)]