IoT & Microcontroller Deployment
The most extreme edge: deploying ML models on microcontrollers with kilobytes of RAM, milliwatts of power budget, and no operating system. Welcome to TinyML — where a model must fit in less memory than a single image takes on your phone.
Microcontroller Constraints
TensorFlow Lite for Microcontrollers (TFLM)
A stripped-down version of TF Lite designed for bare-metal microcontrollers:
Target Hardware
| Platform | Flash | RAM | Use Cases |
|---|---|---|---|
| Arduino Nano 33 BLE | 1 MB | 256 KB | Keyword spotting, gesture recognition |
| ESP32 | 4-16 MB | 520 KB | Wake word, anomaly detection |
| STM32F746 | 1 MB | 320 KB | Predictive maintenance, simple vision |
| Raspberry Pi Pico | 2 MB | 264 KB | Sensor classification |
The TinyML Pipeline
Train model (PC/cloud)
→ Convert to TF Lite (INT8 quantized)
→ Convert to C array (xxd)
→ Compile into firmware
→ Flash to microcontroller
1# TinyML model preparation pipeline
2import numpy as np
3
4class TinyMLPreparer:
5 """Prepare a model for microcontroller deployment."""
6
7 def __init__(self, model_path: str, target_flash_kb: int,
8 target_ram_kb: int):
9 self.model_path = model_path
10 self.target_flash_kb = target_flash_kb
11 self.target_ram_kb = target_ram_kb
12
13 def analyze_model(self, model_size_bytes: int,
14 peak_activation_bytes: int,
15 runtime_overhead_kb: int = 20):
16 """Check if a model fits on the target microcontroller.
17
18 Args:
19 model_size_bytes: Size of quantized model weights
20 peak_activation_bytes: Peak memory for activations during inference
21 runtime_overhead_kb: TFLM runtime overhead (~20KB)
22 """
23 model_kb = model_size_bytes / 1024
24 activation_kb = peak_activation_bytes / 1024
25
26 total_flash_kb = model_kb + runtime_overhead_kb
27 total_ram_kb = activation_kb + 2 # 2KB stack overhead
28
29 flash_ok = total_flash_kb <= self.target_flash_kb
30 ram_ok = total_ram_kb <= self.target_ram_kb
31
32 print(f"=== TinyML Deployment Analysis ===")
33 print(f"Target: Flash={self.target_flash_kb}KB, RAM={self.target_ram_kb}KB")
34 print()
35 print(f"Flash usage:")
36 print(f" Model weights: {model_kb:.1f} KB")
37 print(f" TFLM runtime: {runtime_overhead_kb} KB")
38 print(f" Total: {total_flash_kb:.1f} / {self.target_flash_kb} KB "
39 f"({'OK' if flash_ok else 'EXCEEDS LIMIT'})")
40 print()
41 print(f"RAM usage:")
42 print(f" Activations: {activation_kb:.1f} KB")
43 print(f" Stack: 2 KB")
44 print(f" Total: {total_ram_kb:.1f} / {self.target_ram_kb} KB "
45 f"({'OK' if ram_ok else 'EXCEEDS LIMIT'})")
46 print()
47
48 if flash_ok and ram_ok:
49 flash_util = total_flash_kb / self.target_flash_kb * 100
50 ram_util = total_ram_kb / self.target_ram_kb * 100
51 print(f"DEPLOYABLE! Flash: {flash_util:.0f}%, RAM: {ram_util:.0f}%")
52 else:
53 if not flash_ok:
54 reduction = (total_flash_kb - self.target_flash_kb)
55 print(f"Need to reduce model by {reduction:.0f} KB")
56 if not ram_ok:
57 reduction = (total_ram_kb - self.target_ram_kb)
58 print(f"Need to reduce activations by {reduction:.0f} KB")
59
60 return flash_ok and ram_ok
61
62
63def estimate_model_memory(layers, dtype_bytes=1):
64 """Estimate model size and peak activation memory.
65
66 Args:
67 layers: List of (type, params) tuples
68 - ("dense", (input_dim, output_dim))
69 - ("conv2d", (in_ch, out_ch, kernel_h, kernel_w))
70 dtype_bytes: Bytes per weight (1 for INT8, 4 for FP32)
71 """
72 total_weights = 0
73 peak_activation = 0
74 current_activation = 0
75
76 print("Layer-by-layer analysis:")
77 for i, (layer_type, params) in enumerate(layers):
78 if layer_type == "dense":
79 in_dim, out_dim = params
80 weights = in_dim * out_dim + out_dim # weights + bias
81 activation = out_dim * dtype_bytes
82 elif layer_type == "conv2d":
83 in_ch, out_ch, kh, kw = params
84 weights = in_ch * out_ch * kh * kw + out_ch
85 activation = out_ch * 16 * 16 * dtype_bytes # assume 16x16 output
86
87 total_weights += weights
88 current_activation = activation
89 peak_activation = max(peak_activation, current_activation)
90
91 weight_kb = weights * dtype_bytes / 1024
92 act_kb = activation / 1024
93 print(f" Layer {i}: {layer_type} {params} -> "
94 f"weights={weight_kb:.1f}KB, act={act_kb:.1f}KB")
95
96 model_bytes = total_weights * dtype_bytes
97 print(f"\nTotal weights: {total_weights:,} ({model_bytes/1024:.1f} KB)")
98 print(f"Peak activation: {peak_activation/1024:.1f} KB")
99
100 return model_bytes, peak_activation
101
102
103# --- Example: Keyword spotting model for Arduino Nano ---
104print("=" * 50)
105print("Keyword Spotting Model ("Hey Device")")
106print("=" * 50)
107
108layers = [
109 ("conv2d", (1, 8, 3, 3)), # 8 filters, 3x3
110 ("conv2d", (8, 16, 3, 3)), # 16 filters, 3x3
111 ("dense", (16 * 16 * 16, 64)), # Flatten + dense
112 ("dense", (64, 4)), # 4 classes: hey_device, unknown, silence, noise
113]
114
115model_bytes, peak_act = estimate_model_memory(layers, dtype_bytes=1)
116
117print()
118preparer = TinyMLPreparer("keyword_model.tflite",
119 target_flash_kb=256,
120 target_ram_kb=64)
121preparer.analyze_model(model_bytes, peak_act)Edge TPUs and Accelerators
For workloads beyond what a CPU microcontroller can handle, dedicated edge ML accelerators provide 10-100x speedup:
Google Coral Edge TPU
NVIDIA Jetson Family
| Model | GPU Cores | AI Performance | Power | Use Case |
|---|---|---|---|---|
| Jetson Nano | 128 CUDA | 472 GFLOPS | 5-10W | Hobbyist, prototype |
| Jetson Xavier NX | 384 CUDA + Tensor | 21 TOPS | 10-15W | Drones, robots |
| Jetson Orin Nano | 1024 CUDA + Tensor | 40 TOPS | 7-15W | Production edge AI |
| Jetson AGX Orin | 2048 CUDA + Tensor | 275 TOPS | 15-60W | Autonomous vehicles |
Real-World Constraints
Memory
Power
Latency
OTA (Over-The-Air) Model Updates
Deployed edge models need updates for:
OTA Update Pipeline
1. Train new model version in the cloud 2. Validate on held-out data and shadow deployment 3. Package the model as a firmware update 4. Distribute to devices (staged rollout: 1% → 10% → 100%) 5. Verify successful update on each device 6. Rollback if metrics degrade
1# OTA Model Update Manager
2from dataclasses import dataclass, field
3from typing import List, Dict, Optional
4from datetime import datetime
5import random
6
7@dataclass
8class ModelVersion:
9 version: str
10 size_kb: float
11 accuracy: float
12 created_at: str
13
14@dataclass
15class Device:
16 device_id: str
17 model_version: str
18 hardware: str
19 last_seen: str
20 status: str = "online" # online, offline, updating
21
22class OTAUpdateManager:
23 def __init__(self):
24 self.devices: Dict[str, Device] = {}
25 self.model_versions: Dict[str, ModelVersion] = {}
26 self.rollout_log: List[dict] = []
27
28 def register_device(self, device: Device):
29 self.devices[device.device_id] = device
30
31 def add_model_version(self, version: ModelVersion):
32 self.model_versions[version.version] = version
33
34 def staged_rollout(self, target_version: str,
35 stages: List[float] = [0.01, 0.1, 0.5, 1.0],
36 min_success_rate: float = 0.95):
37 """Perform a staged rollout to all online devices."""
38 model = self.model_versions.get(target_version)
39 if not model:
40 print(f"Model version {target_version} not found!")
41 return
42
43 online_devices = [d for d in self.devices.values()
44 if d.status == "online"
45 and d.model_version != target_version]
46
47 print(f"Staged rollout: v{target_version}")
48 print(f"Target devices: {len(online_devices)}")
49 print(f"Stages: {[f'{s:.0%}' for s in stages]}")
50 print()
51
52 updated_devices = []
53
54 for stage_pct in stages:
55 n_target = int(len(online_devices) * stage_pct)
56 n_remaining = n_target - len(updated_devices)
57
58 if n_remaining <= 0:
59 continue
60
61 candidates = [d for d in online_devices
62 if d not in updated_devices][:n_remaining]
63
64 # Simulate update (some may fail)
65 successes = 0
66 failures = 0
67 for device in candidates:
68 success = random.random() < 0.97 # 97% success rate
69 if success:
70 device.model_version = target_version
71 successes += 1
72 else:
73 failures += 1
74 updated_devices.append(device)
75
76 success_rate = successes / len(candidates) if candidates else 1
77 print(f"Stage {stage_pct:.0%}: "
78 f"{successes}/{len(candidates)} succeeded "
79 f"({success_rate:.1%})")
80
81 if success_rate < min_success_rate:
82 print(f"HALT: Success rate {success_rate:.1%} below "
83 f"threshold {min_success_rate:.1%}")
84 print("Rolling back failed devices...")
85 return False
86
87 total_updated = sum(
88 1 for d in self.devices.values()
89 if d.model_version == target_version
90 )
91 print(f"\nRollout complete: {total_updated}/{len(self.devices)} "
92 f"devices on v{target_version}")
93 return True
94
95 def fleet_status(self):
96 versions = {}
97 for d in self.devices.values():
98 versions[d.model_version] = versions.get(d.model_version, 0) + 1
99
100 print("\n=== Fleet Status ===")
101 print(f"Total devices: {len(self.devices)}")
102 for v, count in sorted(versions.items()):
103 pct = count / len(self.devices) * 100
104 bar = "#" * int(pct / 2)
105 print(f" v{v}: {count:>4d} ({pct:>5.1f}%) {bar}")
106
107
108# --- Simulate an IoT fleet ---
109random.seed(42)
110manager = OTAUpdateManager()
111
112# Register model versions
113manager.add_model_version(ModelVersion("1.0", 45.2, 0.89, "2024-01-01"))
114manager.add_model_version(ModelVersion("2.0", 42.8, 0.93, "2024-03-01"))
115
116# Register 100 devices
117for i in range(100):
118 device = Device(
119 device_id=f"device_{i:03d}",
120 model_version="1.0",
121 hardware="ESP32",
122 last_seen="2024-03-15",
123 status="online" if random.random() > 0.05 else "offline",
124 )
125 manager.register_device(device)
126
127manager.fleet_status()
128print()
129manager.staged_rollout("2.0")
130manager.fleet_status()