📊 Developer Cookbook - FASE 5: Data Quality

Recetas prácticas para validación, limpieza y calidad de datos


📚 Tabla de Contenidos

  1. Receta 5.8: Data Quality - Validación y Limpieza

Data Quality - Validación y Limpieza

Receta 5.8: Data Quality - Validación y Limpieza

Great Expectations - Framework de validación:

import great_expectations as gx
from great_expectations.core.batch import BatchRequest
import pandas as pd

# ===== SETUP =====

# Inicializar contexto
context = gx.get_context()

# Crear expectation suite
suite = context.create_expectation_suite(
    expectation_suite_name="sales_data_quality",
    overwrite_existing=True
)

# ===== EXPECTATIVAS =====

# Tabla level
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 1,
            "max_value": 1000000
        }
    )
)

# Columna existe
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_to_exist",
        kwargs={"column": "order_id"}
    )
)

# No nulls
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "customer_id"}
    )
)

# Valores únicos
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_unique",
        kwargs={"column": "order_id"}
    )
)

# Valores en set
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column": "status",
            "value_set": ["pending", "completed", "cancelled"]
        }
    )
)

# Rango numérico
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "total_amount",
            "min_value": 0,
            "max_value": 100000
        }
    )
)

# Regex pattern
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_match_regex",
        kwargs={
            "column": "email",
            "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
        }
    )
)

# ===== VALIDAR DATOS =====

# Cargar datos
df = pd.read_csv("sales.csv")

# Crear batch
batch = context.sources.add_pandas("sales_source").add_dataframe_asset(
    name="sales_data"
).build_batch_request(dataframe=df)

# Validar
results = context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[batch],
    run_id="manual_run_20240101"
)

# Revisar resultados
if results["success"]:
    print("✅ All validations passed!")
else:
    print("❌ Validations failed:")
    for result in results["run_results"]:
        print(result)

Data Profiling con pandas-profiling:

from ydata_profiling import ProfileReport
import pandas as pd

# Cargar datos
df = pd.read_csv("sales.csv")

# Generar reporte
profile = ProfileReport(
    df,
    title="Sales Data Profile Report",
    explorative=True,
    minimal=False
)

# Guardar reporte HTML
profile.to_file("sales_profile_report.html")

# Ver estadísticas inline
print(profile.description_set)

# Identificar alertas automáticamente
print(profile.get_description()['alerts'])

Detección y manejo de outliers:

import pandas as pd
import numpy as np
from scipy import stats

class OutlierDetector:
    """Detectar y manejar outliers"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.outliers = {}
    
    def detect_iqr(self, column: str, threshold=1.5):
        """Método IQR (Interquartile Range)"""
        Q1 = self.df[column].quantile(0.25)
        Q3 = self.df[column].quantile(0.75)
        IQR = Q3 - Q1
        
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR
        
        outliers = self.df[
            (self.df[column] < lower_bound) | 
            (self.df[column] > upper_bound)
        ]
        
        self.outliers[column] = {
            'method': 'IQR',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def detect_zscore(self, column: str, threshold=3):
        """Método Z-score"""
        z_scores = np.abs(stats.zscore(self.df[column]))
        outliers = self.df[z_scores > threshold]
        
        self.outliers[column] = {
            'method': 'Z-score',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'threshold': threshold,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def detect_isolation_forest(self, columns: list):
        """Método Isolation Forest (ML-based)"""
        from sklearn.ensemble import IsolationForest
        
        iso_forest = IsolationForest(
            contamination=0.1,  # 10% esperado de outliers
            random_state=42
        )
        
        predictions = iso_forest.fit_predict(self.df[columns])
        outliers = self.df[predictions == -1]
        
        self.outliers['multivariate'] = {
            'method': 'Isolation Forest',
            'count': len(outliers),
            'percentage': len(outliers) / len(self.df) * 100,
            'indices': outliers.index.tolist()
        }
        
        return outliers
    
    def remove_outliers(self, column: str, method='iqr'):
        """Remover outliers"""
        if method == 'iqr':
            outlier_indices = self.outliers[column]['indices']
            return self.df.drop(outlier_indices)
    
    def cap_outliers(self, column: str):
        """Cap outliers (winsorization)"""
        info = self.outliers[column]
        
        self.df[column] = self.df[column].clip(
            lower=info['lower_bound'],
            upper=info['upper_bound']
        )
        
        return self.df
    
    def summary(self):
        """Resumen de outliers detectados"""
        for col, info in self.outliers.items():
            print(f"\n=== {col} ===")
            print(f"Method: {info['method']}")
            print(f"Count: {info['count']}")
            print(f"Percentage: {info['percentage']:.2f}%")

# Uso
df = pd.read_csv("sales.csv")
detector = OutlierDetector(df)

# Detectar con diferentes métodos
detector.detect_iqr('total_amount')
detector.detect_zscore('total_amount')

# Ver resumen
detector.summary()

# Remover outliers
df_clean = detector.remove_outliers('total_amount', method='iqr')

Data Cleaning Pipeline completo:

import pandas as pd
import numpy as np
from datetime import datetime

class DataCleaner:
    """Pipeline de limpieza de datos completo"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.cleaning_log = []
    
    def log(self, message: str):
        """Log de operaciones"""
        self.cleaning_log.append({
            'timestamp': datetime.now(),
            'message': message
        })
        print(f"  {message}")
    
    def handle_missing_values(self, strategies: dict):
        """
        Manejar valores faltantes
        
        strategies = {
            'column1': 'drop',
            'column2': 'mean',
            'column3': 'mode',
            'column4': 'forward_fill',
            'column5': 0
        }
        """
        print("\n🧹 Handling missing values...")
        
        for col, strategy in strategies.items():
            if col not in self.df.columns:
                continue
            
            missing_count = self.df[col].isnull().sum()
            if missing_count == 0:
                continue
            
            if strategy == 'drop':
                before = len(self.df)
                self.df = self.df.dropna(subset=[col])
                self.log(f"Dropped {before - len(self.df)} rows with null {col}")
            
            elif strategy == 'mean':
                mean_val = self.df[col].mean()
                self.df[col].fillna(mean_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with mean ({mean_val:.2f})")
            
            elif strategy == 'median':
                median_val = self.df[col].median()
                self.df[col].fillna(median_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with median ({median_val:.2f})")
            
            elif strategy == 'mode':
                mode_val = self.df[col].mode()[0]
                self.df[col].fillna(mode_val, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with mode ({mode_val})")
            
            elif strategy == 'forward_fill':
                self.df[col].fillna(method='ffill', inplace=True)
                self.log(f"Forward filled {missing_count} nulls in {col}")
            
            else:
                # Valor constante
                self.df[col].fillna(strategy, inplace=True)
                self.log(f"Filled {missing_count} nulls in {col} with {strategy}")
        
        return self
    
    def remove_duplicates(self, subset=None, keep='first'):
        """Remover duplicados"""
        print("\n🔍 Removing duplicates...")
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        removed = before - len(self.df)
        
        if removed > 0:
            self.log(f"Removed {removed} duplicate rows")
        else:
            self.log("No duplicates found")
        
        return self
    
    def standardize_text(self, columns: list):
        """Estandarizar columnas de texto"""
        print("\n✏️ Standardizing text...")
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            # Strip whitespace
            self.df[col] = self.df[col].str.strip()
            
            # Lowercase
            self.df[col] = self.df[col].str.lower()
            
            # Remove extra spaces
            self.df[col] = self.df[col].str.replace(r'\s+', ' ', regex=True)
            
            self.log(f"Standardized text in {col}")
        
        return self
    
    def fix_data_types(self, type_map: dict):
        """Corregir tipos de datos"""
        print("\n🔧 Fixing data types...")
        
        for col, dtype in type_map.items():
            if col not in self.df.columns:
                continue
            
            try:
                if dtype == 'datetime':
                    self.df[col] = pd.to_datetime(self.df[col])
                else:
                    self.df[col] = self.df[col].astype(dtype)
                
                self.log(f"Converted {col} to {dtype}")
            except Exception as e:
                self.log(f"Failed to convert {col} to {dtype}: {str(e)}")
        
        return self
    
    def remove_outliers(self, columns: list, method='iqr', threshold=1.5):
        """Remover outliers"""
        print(f"\n📊 Removing outliers using {method} method...")
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            before = len(self.df)
            
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                
                lower = Q1 - threshold * IQR
                upper = Q3 + threshold * IQR
                
                self.df = self.df[
                    (self.df[col] >= lower) & 
                    (self.df[col] <= upper)
                ]
            
            removed = before - len(self.df)
            if removed > 0:
                self.log(f"Removed {removed} outliers from {col}")
        
        return self
    
    def validate_ranges(self, validations: dict):
        """Validar rangos de valores"""
        print("\n✓ Validating value ranges...")
        
        for col, (min_val, max_val) in validations.items():
            if col not in self.df.columns:
                continue
            
            before = len(self.df)
            self.df = self.df[
                (self.df[col] >= min_val) & 
                (self.df[col] <= max_val)
            ]
            
            removed = before - len(self.df)
            if removed > 0:
                self.log(f"Removed {removed} rows with {col} outside [{min_val}, {max_val}]")
        
        return self
    
    def get_cleaned_data(self):
        """Retornar datos limpios"""
        return self.df
    
    def get_cleaning_report(self):
        """Generar reporte de limpieza"""
        return pd.DataFrame(self.cleaning_log)

# ===== USO =====

# Cargar datos
df = pd.read_csv("raw_sales.csv")

# Pipeline de limpieza
cleaner = DataCleaner(df)

df_clean = cleaner \
    .handle_missing_values({
        'customer_id': 'drop',
        'total_amount': 'median',
        'discount': 0,
        'category': 'mode'
    }) \
    .remove_duplicates(subset=['order_id']) \
    .standardize_text(['customer_email', 'customer_name']) \
    .fix_data_types({
        'order_date': 'datetime',
        'total_amount': 'float64',
        'quantity': 'int32'
    }) \
    .validate_ranges({
        'total_amount': (0, 100000),
        'quantity': (1, 1000)
    }) \
    .remove_outliers(['total_amount'], method='iqr') \
    .get_cleaned_data()

# Ver reporte
report = cleaner.get_cleaning_report()
print("\n" + "="*60)
print("CLEANING REPORT")
print("="*60)
print(report)

# Guardar datos limpios
df_clean.to_parquet('clean_sales.parquet', index=False)