Data Quality: Trust But Verify
Models are only as good as their data. A single upstream schema change, a broken ETL pipeline, or a subtle shift in data distribution can silently degrade model performance. Data quality checks act as automated guardrails that catch problems before they reach your model.
This lesson covers Great Expectations (the industry standard for data validation), schema enforcement, anomaly detection, drift monitoring, and data contracts.
The Data Quality Iceberg
Great Expectations
Great Expectations (GX) is the leading open-source framework for data validation. It lets you define expectations (assertions about your data) and automatically validate datasets against them.
Core Concepts
| Concept | Description |
|---|---|
| Expectation | A single assertion about the data (e.g., "column X has no nulls") |
| Expectation Suite | A collection of expectations for a dataset |
| Validator | Applies expectations to actual data and reports results |
| Checkpoint | Automates validation as part of a pipeline |
| Data Docs | Auto-generated HTML documentation of your expectations and results |
Common Expectations
# Column-level expectations
expect_column_to_exist("user_id")
expect_column_values_to_not_be_null("user_id")
expect_column_values_to_be_unique("user_id")
expect_column_values_to_be_between("age", min_value=0, max_value=150)
expect_column_values_to_be_in_set("status", ["active", "inactive"])
expect_column_mean_to_be_between("price", min_value=10, max_value=100)Table-level expectations
expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
expect_table_columns_to_match_set(["id", "name", "value"])
1import numpy as np
2import pandas as pd
3
4# === Great Expectations Simulation ===
5# (Full GX requires installation; this simulates the concepts)
6
7class Expectation:
8 """A single data quality expectation."""
9 def __init__(self, name, check_fn, params=None):
10 self.name = name
11 self.check_fn = check_fn
12 self.params = params or {}
13
14 def validate(self, df):
15 try:
16 success, details = self.check_fn(df, **self.params)
17 return {
18 "expectation": self.name,
19 "success": success,
20 "details": details,
21 }
22 except Exception as e:
23 return {
24 "expectation": self.name,
25 "success": False,
26 "details": f"Error: {str(e)}",
27 }
28
29class ExpectationSuite:
30 """A collection of expectations for a dataset."""
31 def __init__(self, name):
32 self.name = name
33 self.expectations = []
34
35 def add(self, expectation):
36 self.expectations.append(expectation)
37
38 def validate(self, df):
39 results = []
40 for exp in self.expectations:
41 results.append(exp.validate(df))
42 n_pass = sum(1 for r in results if r["success"])
43 n_fail = sum(1 for r in results if not r["success"])
44 return {
45 "suite": self.name,
46 "passed": n_pass,
47 "failed": n_fail,
48 "total": len(results),
49 "success": n_fail == 0,
50 "results": results,
51 }
52
53# --- Define expectation factory functions ---
54def expect_column_exists(df, column):
55 return column in df.columns, f"Column '{column}' {'exists' if column in df.columns else 'missing'}"
56
57def expect_no_nulls(df, column):
58 null_count = df[column].isna().sum()
59 return null_count == 0, f"{null_count} nulls found"
60
61def expect_unique(df, column):
62 dupes = df[column].duplicated().sum()
63 return dupes == 0, f"{dupes} duplicates found"
64
65def expect_between(df, column, min_val, max_val):
66 violations = ((df[column] < min_val) | (df[column] > max_val)).sum()
67 return violations == 0, f"{violations} values out of [{min_val}, {max_val}]"
68
69def expect_in_set(df, column, values):
70 invalid = ~df[column].isin(values)
71 return invalid.sum() == 0, f"{invalid.sum()} values not in {values}"
72
73def expect_row_count(df, min_rows, max_rows):
74 n = len(df)
75 return min_rows <= n <= max_rows, f"Row count: {n} (expected [{min_rows}, {max_rows}])"
76
77def expect_mean_between(df, column, min_val, max_val):
78 mean = df[column].mean()
79 return min_val <= mean <= max_val, f"Mean={mean:.2f} (expected [{min_val}, {max_val}])"
80
81# === Build a validation suite ===
82suite = ExpectationSuite("customer_data_quality")
83
84suite.add(Expectation("column_exists: customer_id",
85 expect_column_exists, {"column": "customer_id"}))
86suite.add(Expectation("no_nulls: customer_id",
87 expect_no_nulls, {"column": "customer_id"}))
88suite.add(Expectation("unique: customer_id",
89 expect_unique, {"column": "customer_id"}))
90suite.add(Expectation("between: age [0, 120]",
91 expect_between, {"column": "age", "min_val": 0, "max_val": 120}))
92suite.add(Expectation("in_set: status",
93 expect_in_set, {"column": "status", "values": ["active", "inactive", "churned"]}))
94suite.add(Expectation("row_count [100, 10000]",
95 expect_row_count, {"min_rows": 100, "max_rows": 10000}))
96suite.add(Expectation("mean: purchase_amount [10, 200]",
97 expect_mean_between,
98 {"column": "purchase_amount", "min_val": 10, "max_val": 200}))
99
100# --- Test with GOOD data ---
101np.random.seed(42)
102good_data = pd.DataFrame({
103 "customer_id": range(500),
104 "age": np.random.randint(18, 80, 500),
105 "status": np.random.choice(["active", "inactive", "churned"], 500),
106 "purchase_amount": np.random.lognormal(3.5, 0.8, 500),
107})
108
109print("=== Validating GOOD data ===")
110result = suite.validate(good_data)
111print(f"Suite: {result['suite']}")
112print(f"Result: {'PASS' if result['success'] else 'FAIL'}")
113print(f"Passed: {result['passed']}/{result['total']}")
114for r in result["results"]:
115 status = "PASS" if r["success"] else "FAIL"
116 print(f" [{status}] {r['expectation']}: {r['details']}")
117
118# --- Test with BAD data ---
119bad_data = good_data.copy()
120bad_data.loc[10, "age"] = -5 # Invalid age
121bad_data.loc[20, "age"] = 999 # Invalid age
122bad_data.loc[30, "status"] = "unknown" # Invalid status
123bad_data.loc[40, "customer_id"] = 0 # Duplicate ID
124
125print("\n=== Validating BAD data ===")
126result = suite.validate(bad_data)
127print(f"Result: {'PASS' if result['success'] else 'FAIL'}")
128print(f"Passed: {result['passed']}/{result['total']}")
129for r in result["results"]:
130 status = "PASS" if r["success"] else "FAIL"
131 print(f" [{status}] {r['expectation']}: {r['details']}")Data Drift Monitoring
Data drift occurs when the statistical properties of input data change over time, causing model performance to degrade.
Types of Drift
| Type | What changes | Detection |
|---|---|---|
| Covariate drift | Input feature distributions | Statistical tests on features |
| Label drift | Target variable distribution | Monitor prediction distribution |
| Concept drift | Relationship between features and target | Monitor model performance metrics |
Detection Methods
Data Contracts
Data contracts are formal agreements between data producers and consumers that define:
Data contracts prevent the "my upstream broke your downstream" problem by making data quality a shared responsibility with clear expectations.
1import numpy as np
2import pandas as pd
3from scipy import stats
4
5np.random.seed(42)
6
7# === Data Drift Detection ===
8
9def compute_psi(reference, current, n_bins=10):
10 """Population Stability Index between two distributions."""
11 # Create bins from reference distribution
12 bins = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
13 bins[0] = -np.inf
14 bins[-1] = np.inf
15
16 ref_counts = np.histogram(reference, bins=bins)[0]
17 cur_counts = np.histogram(current, bins=bins)[0]
18
19 # Add small constant to avoid division by zero
20 ref_pct = (ref_counts + 1) / (len(reference) + n_bins)
21 cur_pct = (cur_counts + 1) / (len(current) + n_bins)
22
23 psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
24 return psi
25
26def detect_drift(reference_df, current_df, feature_names,
27 psi_threshold=0.2, ks_alpha=0.05):
28 """Detect drift across multiple features."""
29 results = []
30 for feat in feature_names:
31 ref = reference_df[feat].dropna().values
32 cur = current_df[feat].dropna().values
33
34 # KS test
35 ks_stat, ks_pval = stats.ks_2samp(ref, cur)
36
37 # PSI
38 psi = compute_psi(ref, cur)
39
40 # Summary stats comparison
41 ref_mean, ref_std = ref.mean(), ref.std()
42 cur_mean, cur_std = cur.mean(), cur.std()
43
44 drift_detected = (psi > psi_threshold) or (ks_pval < ks_alpha)
45
46 results.append({
47 "feature": feat,
48 "ks_stat": ks_stat,
49 "ks_pval": ks_pval,
50 "psi": psi,
51 "ref_mean": ref_mean,
52 "cur_mean": cur_mean,
53 "mean_shift": cur_mean - ref_mean,
54 "drift": drift_detected,
55 })
56
57 return pd.DataFrame(results)
58
59# --- Simulate drift scenarios ---
60n_reference = 5000
61n_current = 5000
62
63# Reference data (training distribution)
64reference = pd.DataFrame({
65 "age": np.random.normal(35, 10, n_reference),
66 "income": np.random.lognormal(10.5, 0.5, n_reference),
67 "score": np.random.beta(2, 5, n_reference) * 100,
68 "days_active": np.random.exponential(30, n_reference),
69})
70
71# Current data WITH drift
72current = pd.DataFrame({
73 "age": np.random.normal(40, 12, n_current), # Mean shifted
74 "income": np.random.lognormal(10.5, 0.5, n_current), # No drift
75 "score": np.random.beta(5, 2, n_current) * 100, # Distribution flipped
76 "days_active": np.random.exponential(30, n_current), # No drift
77})
78
79# Run drift detection
80features = ["age", "income", "score", "days_active"]
81drift_results = detect_drift(reference, current, features)
82
83print("=== Data Drift Report ===")
84print(f"{'Feature':<15} {'PSI':>8} {'KS stat':>8} {'KS p-val':>10} "
85 f"{'Mean Shift':>12} {'Drift?':>8}")
86print("-" * 65)
87for _, row in drift_results.iterrows():
88 flag = "YES ***" if row["drift"] else "no"
89 print(f"{row['feature']:<15} {row['psi']:>8.4f} {row['ks_stat']:>8.4f} "
90 f"{row['ks_pval']:>10.4f} {row['mean_shift']:>+12.2f} "
91 f"{flag:>8}")
92
93drifted = drift_results[drift_results["drift"]]
94print(f"\nDrifted features: {list(drifted['feature'])}")
95
96# PSI interpretation
97print("\nPSI Interpretation:")
98for _, row in drift_results.iterrows():
99 if row["psi"] < 0.1:
100 level = "No significant drift"
101 elif row["psi"] < 0.2:
102 level = "Moderate drift - monitor closely"
103 else:
104 level = "Significant drift - investigate immediately"
105 print(f" {row['feature']}: PSI={row['psi']:.4f} -> {level}")