📊 Developer Cookbook - FASE 5: Data Quality
Recetas prácticas para validación, limpieza y calidad de datos
📚 Tabla de Contenidos
Data Quality - Validación y Limpieza
Receta 5.8: Data Quality - Validación y Limpieza
Great Expectations - Framework de validación:
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
import pandas as pd
# ===== SETUP =====
# Inicializar contexto
context = gx.get_context()
# Crear expectation suite
suite = context.create_expectation_suite(
expectation_suite_name="sales_data_quality",
overwrite_existing=True
)
# ===== EXPECTATIVAS =====
# Tabla level
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1,
"max_value": 1000000
}
)
)
# Columna existe
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_to_exist",
kwargs={"column": "order_id"}
)
)
# No nulls
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
)
)
# Valores únicos
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
)
)
# Valores en set
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "completed", "cancelled"]
}
)
)
# Rango numérico
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "total_amount",
"min_value": 0,
"max_value": 100000
}
)
)
# Regex pattern
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_match_regex",
kwargs={
"column": "email",
"regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
}
)
)
# ===== VALIDAR DATOS =====
# Cargar datos
df = pd.read_csv("sales.csv")
# Crear batch
batch = context.sources.add_pandas("sales_source").add_dataframe_asset(
name="sales_data"
).build_batch_request(dataframe=df)
# Validar
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
run_id="manual_run_20240101"
)
# Revisar resultados
if results["success"]:
print("✅ All validations passed!")
else:
print("❌ Validations failed:")
for result in results["run_results"]:
print(result)
Data Profiling con pandas-profiling:
from ydata_profiling import ProfileReport
import pandas as pd
# Cargar datos
df = pd.read_csv("sales.csv")
# Generar reporte
profile = ProfileReport(
df,
title="Sales Data Profile Report",
explorative=True,
minimal=False
)
# Guardar reporte HTML
profile.to_file("sales_profile_report.html")
# Ver estadísticas inline
print(profile.description_set)
# Identificar alertas automáticamente
print(profile.get_description()['alerts'])
Detección y manejo de outliers:
import pandas as pd
import numpy as np
from scipy import stats
class OutlierDetector:
"""Detectar y manejar outliers"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.outliers = {}
def detect_iqr(self, column: str, threshold=1.5):
"""Método IQR (Interquartile Range)"""
Q1 = self.df[column].quantile(0.25)
Q3 = self.df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
outliers = self.df[
(self.df[column] < lower_bound) |
(self.df[column] > upper_bound)
]
self.outliers[column] = {
'method': 'IQR',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound,
'indices': outliers.index.tolist()
}
return outliers
def detect_zscore(self, column: str, threshold=3):
"""Método Z-score"""
z_scores = np.abs(stats.zscore(self.df[column]))
outliers = self.df[z_scores > threshold]
self.outliers[column] = {
'method': 'Z-score',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'threshold': threshold,
'indices': outliers.index.tolist()
}
return outliers
def detect_isolation_forest(self, columns: list):
"""Método Isolation Forest (ML-based)"""
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(
contamination=0.1, # 10% esperado de outliers
random_state=42
)
predictions = iso_forest.fit_predict(self.df[columns])
outliers = self.df[predictions == -1]
self.outliers['multivariate'] = {
'method': 'Isolation Forest',
'count': len(outliers),
'percentage': len(outliers) / len(self.df) * 100,
'indices': outliers.index.tolist()
}
return outliers
def remove_outliers(self, column: str, method='iqr'):
"""Remover outliers"""
if method == 'iqr':
outlier_indices = self.outliers[column]['indices']
return self.df.drop(outlier_indices)
def cap_outliers(self, column: str):
"""Cap outliers (winsorization)"""
info = self.outliers[column]
self.df[column] = self.df[column].clip(
lower=info['lower_bound'],
upper=info['upper_bound']
)
return self.df
def summary(self):
"""Resumen de outliers detectados"""
for col, info in self.outliers.items():
print(f"\n=== {col} ===")
print(f"Method: {info['method']}")
print(f"Count: {info['count']}")
print(f"Percentage: {info['percentage']:.2f}%")
# Uso
df = pd.read_csv("sales.csv")
detector = OutlierDetector(df)
# Detectar con diferentes métodos
detector.detect_iqr('total_amount')
detector.detect_zscore('total_amount')
# Ver resumen
detector.summary()
# Remover outliers
df_clean = detector.remove_outliers('total_amount', method='iqr')
Data Cleaning Pipeline completo:
import pandas as pd
import numpy as np
from datetime import datetime
class DataCleaner:
"""Pipeline de limpieza de datos completo"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.cleaning_log = []
def log(self, message: str):
"""Log de operaciones"""
self.cleaning_log.append({
'timestamp': datetime.now(),
'message': message
})
print(f" {message}")
def handle_missing_values(self, strategies: dict):
"""
Manejar valores faltantes
strategies = {
'column1': 'drop',
'column2': 'mean',
'column3': 'mode',
'column4': 'forward_fill',
'column5': 0
}
"""
print("\n🧹 Handling missing values...")
for col, strategy in strategies.items():
if col not in self.df.columns:
continue
missing_count = self.df[col].isnull().sum()
if missing_count == 0:
continue
if strategy == 'drop':
before = len(self.df)
self.df = self.df.dropna(subset=[col])
self.log(f"Dropped {before - len(self.df)} rows with null {col}")
elif strategy == 'mean':
mean_val = self.df[col].mean()
self.df[col].fillna(mean_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with mean ({mean_val:.2f})")
elif strategy == 'median':
median_val = self.df[col].median()
self.df[col].fillna(median_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with median ({median_val:.2f})")
elif strategy == 'mode':
mode_val = self.df[col].mode()[0]
self.df[col].fillna(mode_val, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with mode ({mode_val})")
elif strategy == 'forward_fill':
self.df[col].fillna(method='ffill', inplace=True)
self.log(f"Forward filled {missing_count} nulls in {col}")
else:
# Valor constante
self.df[col].fillna(strategy, inplace=True)
self.log(f"Filled {missing_count} nulls in {col} with {strategy}")
return self
def remove_duplicates(self, subset=None, keep='first'):
"""Remover duplicados"""
print("\n🔍 Removing duplicates...")
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} duplicate rows")
else:
self.log("No duplicates found")
return self
def standardize_text(self, columns: list):
"""Estandarizar columnas de texto"""
print("\n✏️ Standardizing text...")
for col in columns:
if col not in self.df.columns:
continue
# Strip whitespace
self.df[col] = self.df[col].str.strip()
# Lowercase
self.df[col] = self.df[col].str.lower()
# Remove extra spaces
self.df[col] = self.df[col].str.replace(r'\s+', ' ', regex=True)
self.log(f"Standardized text in {col}")
return self
def fix_data_types(self, type_map: dict):
"""Corregir tipos de datos"""
print("\n🔧 Fixing data types...")
for col, dtype in type_map.items():
if col not in self.df.columns:
continue
try:
if dtype == 'datetime':
self.df[col] = pd.to_datetime(self.df[col])
else:
self.df[col] = self.df[col].astype(dtype)
self.log(f"Converted {col} to {dtype}")
except Exception as e:
self.log(f"Failed to convert {col} to {dtype}: {str(e)}")
return self
def remove_outliers(self, columns: list, method='iqr', threshold=1.5):
"""Remover outliers"""
print(f"\n📊 Removing outliers using {method} method...")
for col in columns:
if col not in self.df.columns:
continue
before = len(self.df)
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
self.df = self.df[
(self.df[col] >= lower) &
(self.df[col] <= upper)
]
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} outliers from {col}")
return self
def validate_ranges(self, validations: dict):
"""Validar rangos de valores"""
print("\n✓ Validating value ranges...")
for col, (min_val, max_val) in validations.items():
if col not in self.df.columns:
continue
before = len(self.df)
self.df = self.df[
(self.df[col] >= min_val) &
(self.df[col] <= max_val)
]
removed = before - len(self.df)
if removed > 0:
self.log(f"Removed {removed} rows with {col} outside [{min_val}, {max_val}]")
return self
def get_cleaned_data(self):
"""Retornar datos limpios"""
return self.df
def get_cleaning_report(self):
"""Generar reporte de limpieza"""
return pd.DataFrame(self.cleaning_log)
# ===== USO =====
# Cargar datos
df = pd.read_csv("raw_sales.csv")
# Pipeline de limpieza
cleaner = DataCleaner(df)
df_clean = cleaner \
.handle_missing_values({
'customer_id': 'drop',
'total_amount': 'median',
'discount': 0,
'category': 'mode'
}) \
.remove_duplicates(subset=['order_id']) \
.standardize_text(['customer_email', 'customer_name']) \
.fix_data_types({
'order_date': 'datetime',
'total_amount': 'float64',
'quantity': 'int32'
}) \
.validate_ranges({
'total_amount': (0, 100000),
'quantity': (1, 1000)
}) \
.remove_outliers(['total_amount'], method='iqr') \
.get_cleaned_data()
# Ver reporte
report = cleaner.get_cleaning_report()
print("\n" + "="*60)
print("CLEANING REPORT")
print("="*60)
print(report)
# Guardar datos limpios
df_clean.to_parquet('clean_sales.parquet', index=False)