Production Recommender Systems
Building a recommendation model is one thing. Deploying it to serve millions of users in real time is a fundamentally different challenge. Production recommender systems are complex engineering systems with multiple stages.
The Multi-Stage Architecture
Real-world recommendation pipelines use a funnel approach:
Full Item Catalog (millions)
|
[Candidate Generation] ← Fast, coarse (ANN retrieval, rule-based)
|
Candidates (~1000)
|
[Ranking] ← Accurate, slower (deep model with many features)
|
Top ranked (~100)
|
[Re-ranking] ← Business logic, diversity, freshness
|
Final results (~10-50)
|
User sees recommendations
Stage 1: Candidate Generation
Stage 2: Ranking
Stage 3: Re-ranking
1import numpy as np
2from collections import defaultdict
3
4class RecommendationPipeline:
5 """
6 Multi-stage recommendation pipeline:
7 Candidate Generation -> Ranking -> Re-ranking
8 """
9
10 def __init__(self, n_items, item_features, item_popularity):
11 self.n_items = n_items
12 self.item_features = item_features # (n_items, n_features)
13 self.item_popularity = item_popularity # (n_items,) popularity scores
14 self.item_categories = None
15
16 def candidate_generation(self, user_profile, n_candidates=100):
17 """
18 Stage 1: Fast candidate retrieval using cosine similarity + popularity.
19 """
20 # Content-based candidates
21 scores = self.item_features @ user_profile
22 scores = scores / (np.linalg.norm(self.item_features, axis=1) *
23 np.linalg.norm(user_profile) + 1e-8)
24
25 # Blend with popularity (for exploration)
26 blended = 0.7 * scores + 0.3 * self.item_popularity
27 top_candidates = np.argsort(-blended)[:n_candidates]
28 return top_candidates, blended[top_candidates]
29
30 def ranking(self, candidate_ids, candidate_scores, user_context):
31 """
32 Stage 2: Re-score candidates with richer features.
33 Simulates a deep ranking model with additional features.
34 """
35 detailed_scores = []
36 for idx, item_id in enumerate(candidate_ids):
37 base_score = candidate_scores[idx]
38 # Simulate additional feature-based scoring
39 freshness_bonus = user_context.get("freshness_weight", 0) * np.random.random()
40 context_score = user_context.get("time_weight", 0) * np.random.random()
41 final_score = base_score + 0.1 * freshness_bonus + 0.05 * context_score
42 detailed_scores.append(final_score)
43
44 detailed_scores = np.array(detailed_scores)
45 ranking = np.argsort(-detailed_scores)
46 return candidate_ids[ranking], detailed_scores[ranking]
47
48 def re_ranking(self, ranked_ids, ranked_scores, categories, n_final=10,
49 max_per_category=3):
50 """
51 Stage 3: Apply diversity constraints and business rules.
52 """
53 final_items = []
54 category_counts = defaultdict(int)
55
56 for item_id, score in zip(ranked_ids, ranked_scores):
57 cat = categories.get(item_id, "unknown")
58 if category_counts[cat] >= max_per_category:
59 continue # Skip if category quota is full
60 final_items.append((item_id, score, cat))
61 category_counts[cat] += 1
62 if len(final_items) >= n_final:
63 break
64
65 return final_items
66
67 def recommend(self, user_profile, user_context, categories, n_final=10):
68 """Full pipeline."""
69 # Stage 1
70 candidates, scores = self.candidate_generation(user_profile, n_candidates=50)
71 # Stage 2
72 ranked_ids, ranked_scores = self.ranking(candidates, scores, user_context)
73 # Stage 3
74 results = self.re_ranking(ranked_ids, ranked_scores, categories, n_final)
75 return results
76
77
78# Demo
79np.random.seed(42)
80n_items = 1000
81n_features = 20
82
83item_features = np.random.randn(n_items, n_features)
84item_features = item_features / np.linalg.norm(item_features, axis=1, keepdims=True)
85popularity = np.random.exponential(0.3, n_items)
86popularity = popularity / popularity.max()
87
88categories = {i: f"cat_{i % 5}" for i in range(n_items)}
89
90pipeline = RecommendationPipeline(n_items, item_features, popularity)
91
92user_profile = np.random.randn(n_features)
93user_context = {"freshness_weight": 0.5, "time_weight": 0.3}
94
95results = pipeline.recommend(user_profile, user_context, categories, n_final=10)
96
97print("Final recommendations:")
98print(f"{'Item ID':<10} {'Score':<10} {'Category'}")
99print("-" * 35)
100for item_id, score, cat in results:
101 print(f"{item_id:<10} {score:<10.4f} {cat}")
102
103# Show diversity
104cats = [r[2] for r in results]
105print(f"\nCategories represented: {len(set(cats))} / {len(cats)}")A/B Testing for Recommendations
Offline metrics (RMSE, AUC, NDCG) are useful for development but can be misleading about real-world impact. A/B testing (online experimentation) is essential.
How It Works
1. Split live traffic randomly into control (current system) and treatment (new model) 2. Run both systems simultaneously for a fixed period (1-4 weeks) 3. Measure business metrics: CTR, engagement time, conversion rate, revenue 4. Use statistical tests to determine if the difference is significant
Key Metrics
| Metric | What It Measures |
|---|---|
| CTR (Click-Through Rate) | % of recommendations that were clicked |
| Conversion Rate | % of recommendations that led to a purchase/signup |
| Session Duration | How long users stay engaged |
| Retention | Do users come back? |
| Revenue per User | Direct business impact |
| Diversity | Variety of recommended categories/genres |
| Coverage | % of catalog that gets recommended |
Common Pitfalls
Online Learning and Bandits
Static recommendation models become stale as user preferences and item catalogs evolve. Online learning continuously updates models from fresh interactions.
The Exploration-Exploitation Dilemma
Multi-Armed Bandits
Bandits formalize the explore/exploit trade-off. Each item is an "arm" of the bandit machine.
#### Epsilon-Greedy
#### Upper Confidence Bound (UCB)
UCB(i) = mean_reward(i) + c * sqrt(ln(total_pulls) / n_pulls(i))#### Thompson Sampling
1import numpy as np
2
3class ThompsonSamplingBandit:
4 """
5 Thompson Sampling for recommendation exploration.
6 Models each item's click rate with a Beta distribution.
7 """
8
9 def __init__(self, n_items):
10 self.n_items = n_items
11 # Beta(alpha, beta) prior for each item
12 # alpha = successes + 1, beta = failures + 1
13 self.alpha = np.ones(n_items) # Prior successes
14 self.beta_param = np.ones(n_items) # Prior failures
15
16 def select(self, candidate_ids, n_select=5):
17 """
18 Select items using Thompson Sampling.
19 Sample from each item's Beta distribution, pick top-n.
20 """
21 # Sample from Beta distribution for each candidate
22 samples = np.array([
23 np.random.beta(self.alpha[i], self.beta_param[i])
24 for i in candidate_ids
25 ])
26 top_idx = np.argsort(-samples)[:n_select]
27 return [candidate_ids[i] for i in top_idx]
28
29 def update(self, item_id, reward):
30 """
31 Update beliefs after observing a reward.
32 reward: 1 = click/conversion, 0 = no click
33 """
34 if reward > 0:
35 self.alpha[item_id] += 1
36 else:
37 self.beta_param[item_id] += 1
38
39 def get_stats(self, item_id):
40 """Get current beliefs about an item."""
41 a, b = self.alpha[item_id], self.beta_param[item_id]
42 return {
43 "mean": a / (a + b),
44 "std": np.sqrt(a * b / ((a + b) ** 2 * (a + b + 1))),
45 "total_trials": a + b - 2, # Subtract priors
46 }
47
48
49class UCBBandit:
50 """Upper Confidence Bound bandit."""
51
52 def __init__(self, n_items, c=2.0):
53 self.n_items = n_items
54 self.c = c
55 self.rewards = np.zeros(n_items)
56 self.counts = np.zeros(n_items)
57 self.total_count = 0
58
59 def select(self, candidate_ids, n_select=5):
60 # UCB score for each candidate
61 scores = []
62 for i in candidate_ids:
63 if self.counts[i] == 0:
64 scores.append(float('inf')) # Always try untried items
65 else:
66 mean = self.rewards[i] / self.counts[i]
67 bonus = self.c * np.sqrt(np.log(self.total_count + 1) / self.counts[i])
68 scores.append(mean + bonus)
69 top_idx = np.argsort(-np.array(scores))[:n_select]
70 return [candidate_ids[i] for i in top_idx]
71
72 def update(self, item_id, reward):
73 self.counts[item_id] += 1
74 self.rewards[item_id] += reward
75 self.total_count += 1
76
77
78# Simulation: 20 items with different true click rates
79np.random.seed(42)
80n_items = 20
81true_click_rates = np.random.beta(2, 8, n_items) # Most items have low CTR
82true_click_rates[3] = 0.5 # Item 3 is great
83true_click_rates[7] = 0.4 # Item 7 is good
84true_click_rates[15] = 0.45 # Item 15 is good
85
86ts_bandit = ThompsonSamplingBandit(n_items)
87ucb_bandit = UCBBandit(n_items)
88
89candidates = list(range(n_items))
90n_rounds = 500
91ts_cumulative_reward = 0
92ucb_cumulative_reward = 0
93
94for t in range(n_rounds):
95 # Thompson Sampling
96 ts_selected = ts_bandit.select(candidates, n_select=1)
97 ts_item = ts_selected[0]
98 ts_reward = 1 if np.random.random() < true_click_rates[ts_item] else 0
99 ts_bandit.update(ts_item, ts_reward)
100 ts_cumulative_reward += ts_reward
101
102 # UCB
103 ucb_selected = ucb_bandit.select(candidates, n_select=1)
104 ucb_item = ucb_selected[0]
105 ucb_reward = 1 if np.random.random() < true_click_rates[ucb_item] else 0
106 ucb_bandit.update(ucb_item, ucb_reward)
107 ucb_cumulative_reward += ucb_reward
108
109print(f"True best items: {np.argsort(-true_click_rates)[:3]} "
110 f"(rates: {np.sort(true_click_rates)[-3:][::-1].round(3)})")
111print(f"\nAfter {n_rounds} rounds:")
112print(f" Thompson Sampling cumulative reward: {ts_cumulative_reward}")
113print(f" UCB cumulative reward: {ucb_cumulative_reward}")
114
115# Show what TS learned
116print("\nThompson Sampling learned rates (top 5):")
117stats = [(i, ts_bandit.get_stats(i)) for i in range(n_items)]
118stats.sort(key=lambda x: -x[1]["mean"])
119for item_id, s in stats[:5]:
120 print(f" Item {item_id}: estimated={s['mean']:.3f} (true={true_click_rates[item_id]:.3f}), "
121 f"trials={s['total_trials']:.0f}")Evaluation Metrics for Recommender Systems
Offline evaluation helps iterate quickly during development. The right metric depends on the task.
Ranking Metrics
NDCG (Normalized Discounted Cumulative Gain): Measures ranking quality, giving more credit to relevant items ranked higher.
DCG@K = sum(rel_i / log2(i+1)) for i = 1..K
NDCG@K = DCG@K / IDCG@K (normalized by ideal ranking)
MAP (Mean Average Precision): Average of precision at each relevant item's rank.
AP = sum(Precision@k * rel(k)) / number_of_relevant_items
MAP = mean of AP over all users
Hit Rate (Recall@K): Fraction of users whose relevant item appears in top-K.
Beyond Accuracy
| Metric | What It Measures |
|---|---|
| Diversity | Intra-list distance: how different are the recommended items from each other? |
| Novelty | How popular are recommended items? Less popular = more novel |
| Coverage | What fraction of the catalog gets recommended? |
| Serendipity | Were recommendations both relevant AND unexpected? |
| Fairness | Is recommendation quality equitable across user/item groups? |
Offline vs Online Metrics
Offline metrics are computed on held-out interaction data. They are useful for:
But they do not replace A/B testing because:
1import numpy as np
2
3def dcg_at_k(relevances, k):
4 """Discounted Cumulative Gain at K."""
5 relevances = np.array(relevances[:k])
6 positions = np.arange(1, len(relevances) + 1)
7 return np.sum(relevances / np.log2(positions + 1))
8
9
10def ndcg_at_k(relevances, k):
11 """Normalized DCG at K."""
12 actual_dcg = dcg_at_k(relevances, k)
13 ideal_dcg = dcg_at_k(sorted(relevances, reverse=True), k)
14 return actual_dcg / ideal_dcg if ideal_dcg > 0 else 0.0
15
16
17def precision_at_k(relevances, k):
18 """Precision at K."""
19 return np.mean(relevances[:k])
20
21
22def average_precision(relevances):
23 """Average Precision for a single query/user."""
24 relevances = np.array(relevances)
25 if relevances.sum() == 0:
26 return 0.0
27 precisions = []
28 for k in range(1, len(relevances) + 1):
29 if relevances[k - 1] == 1:
30 precisions.append(precision_at_k(relevances, k))
31 return np.mean(precisions) if precisions else 0.0
32
33
34def hit_rate_at_k(relevances, k):
35 """1 if any relevant item in top-k, else 0."""
36 return 1.0 if sum(relevances[:k]) > 0 else 0.0
37
38
39def intra_list_diversity(item_features, recommended_ids):
40 """
41 Average pairwise distance between recommended items.
42 Higher = more diverse.
43 """
44 if len(recommended_ids) < 2:
45 return 0.0
46 vecs = item_features[recommended_ids]
47 n = len(recommended_ids)
48 total_dist = 0
49 count = 0
50 for i in range(n):
51 for j in range(i + 1, n):
52 # Cosine distance = 1 - cosine_similarity
53 cos_sim = np.dot(vecs[i], vecs[j]) / (
54 np.linalg.norm(vecs[i]) * np.linalg.norm(vecs[j]) + 1e-8
55 )
56 total_dist += 1 - cos_sim
57 count += 1
58 return total_dist / count
59
60
61def catalog_coverage(all_recommendations, n_items):
62 """Fraction of items that appear in any user's recommendations."""
63 recommended_items = set()
64 for recs in all_recommendations:
65 recommended_items.update(recs)
66 return len(recommended_items) / n_items
67
68
69# Example: Evaluate two models
70np.random.seed(42)
71
72# Model A: Good ranking but low diversity
73model_a_relevances = [
74 [1, 1, 0, 1, 0, 0, 0, 0, 0, 0],
75 [1, 0, 1, 0, 0, 1, 0, 0, 0, 0],
76 [0, 1, 1, 0, 0, 0, 1, 0, 0, 0],
77]
78
79# Model B: Decent ranking, better diversity
80model_b_relevances = [
81 [1, 0, 1, 0, 1, 0, 0, 0, 0, 0],
82 [0, 1, 0, 1, 0, 0, 1, 0, 0, 0],
83 [1, 0, 0, 1, 0, 1, 0, 0, 0, 0],
84]
85
86for name, relevances_list in [("Model A", model_a_relevances), ("Model B", model_b_relevances)]:
87 ndcgs = [ndcg_at_k(rel, 10) for rel in relevances_list]
88 maps = [average_precision(rel) for rel in relevances_list]
89 hrs = [hit_rate_at_k(rel, 5) for rel in relevances_list]
90
91 print(f"\n{name}:")
92 print(f" NDCG@10: {np.mean(ndcgs):.4f}")
93 print(f" MAP: {np.mean(maps):.4f}")
94 print(f" HR@5: {np.mean(hrs):.4f}")
95
96# Diversity example
97n_items = 50
98item_features = np.random.randn(n_items, 10)
99item_features = item_features / np.linalg.norm(item_features, axis=1, keepdims=True)
100
101similar_recs = [0, 1, 2, 3, 4] # Nearby items
102diverse_recs = [0, 10, 20, 30, 40] # Spread out items
103
104print(f"\nDiversity (similar recs): {intra_list_diversity(item_features, similar_recs):.4f}")
105print(f"Diversity (diverse recs): {intra_list_diversity(item_features, diverse_recs):.4f}")
106
107# Coverage
108all_recs_narrow = [[0, 1, 2], [0, 1, 3], [0, 2, 4]]
109all_recs_wide = [[0, 10, 20], [5, 15, 25], [30, 35, 40]]
110print(f"\nCoverage (narrow): {catalog_coverage(all_recs_narrow, n_items):.2%}")
111print(f"Coverage (wide): {catalog_coverage(all_recs_wide, n_items):.2%}")