diff --git a/examples/quantized_inference.py b/examples/quantized_inference.py new file mode 100644 index 0000000..7b10cd8 --- /dev/null +++ b/examples/quantized_inference.py @@ -0,0 +1,93 @@ +""" +Quantized Inference Example for OpenMythos. + +Demonstrates running mythos_1b with INT4 quantization and expert offloading +on consumer hardware (RTX 3060 12GB). + +Usage: + python examples/quantized_inference.py +""" + +import torch +import time +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from open_mythos import OpenMythos, mythos_1b +from open_mythos.quantization import quantize_model, print_quantization_summary +from open_mythos.expert_offloader import ExpertOffloader + + +def main(): + print("=" * 60) + print("OpenMythos Quantized Inference Demo") + print("=" * 60) + + # 1. Create model + print("\n[1/5] Creating mythos_1b model...") + cfg = mythos_1b() + model = OpenMythos(cfg) + print(f" Parameters: {sum(p.numel() for p in model.parameters()):,}") + + # 2. Quantize to INT4 + print("\n[2/5] Quantizing to INT4 (expert FFN layers only)...") + model = quantize_model(model, bits=4, group_size=128) + print_quantization_summary(model) + + # 3. Setup expert offloading + print("\n[3/5] Setting up expert offloading...") + print(f" GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'}") + + if torch.cuda.is_available(): + offloader = ExpertOffloader( + model, + gpu_experts=4, # Keep 4 experts on GPU + cache_experts=16, # Keep 16 in CPU RAM + ) + offloader.prepare() + print(f" GPU experts: 4 | CPU cache: 16 | Disk: rest") + else: + print(" Running on CPU (no offloading needed)") + + # 4. Generate text + print("\n[4/5] Generating text...") + input_ids = torch.randint(0, cfg.vocab_size, (1, 32)) + if torch.cuda.is_available(): + input_ids = input_ids.cuda() + + # Warmup + _ = model.generate(input_ids, max_new_tokens=4, n_loops=2) + + # Benchmark + start = time.time() + with torch.no_grad(): + output = model.generate(input_ids, max_new_tokens=64, n_loops=4) + elapsed = time.time() - start + + tokens_generated = output.shape[1] - input_ids.shape[1] + tokens_per_sec = tokens_generated / elapsed + + print(f" Generated {tokens_generated} tokens in {elapsed:.2f}s") + print(f" Speed: {tokens_per_sec:.1f} tokens/sec") + + # 5. Memory usage + print("\n[5/5] Memory usage:") + if torch.cuda.is_available(): + allocated = torch.cuda.memory_allocated() / 1024 / 1024 + reserved = torch.cuda.memory_reserved() / 1024 / 1024 + print(f" GPU allocated: {allocated:.1f} MB") + print(f" GPU reserved: {reserved:.1f} MB") + + if torch.cuda.is_available(): + print(f"\nOffloader stats:") + offloader.print_stats() + + print("\n" + "=" * 60) + print("Done! Model runs successfully with INT4 quantization.") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/open_mythos/__init__.py b/open_mythos/__init__.py index 73c2c04..8598eff 100644 --- a/open_mythos/__init__.py +++ b/open_mythos/__init__.py @@ -16,6 +16,18 @@ precompute_rope_freqs, ) from open_mythos.tokenizer import MythosTokenizer +from open_mythos.quantization import ( + QuantizedLinear, + quantize_linear_layer, + quantize_moe_experts, + quantize_model, + get_model_memory_mb, + print_quantization_summary, +) +from open_mythos.expert_offloader import ( + ExpertOffloader, + create_offloaded_model, +) from open_mythos.variants import ( mythos_1b, mythos_1t, @@ -52,4 +64,14 @@ "load_tokenizer", "get_vocab_size", "MythosTokenizer", + # Quantization + "QuantizedLinear", + "quantize_linear_layer", + "quantize_moe_experts", + "quantize_model", + "get_model_memory_mb", + "print_quantization_summary", + # Expert Offloading + "ExpertOffloader", + "create_offloaded_model", ] diff --git a/open_mythos/expert_offloader.py b/open_mythos/expert_offloader.py new file mode 100644 index 0000000..78d0b21 --- /dev/null +++ b/open_mythos/expert_offloader.py @@ -0,0 +1,330 @@ +""" +Expert Offloading System for OpenMythos MoE Models. + +Enables running large MoE models (500B, 1T) on consumer hardware by +offloading inactive experts to CPU RAM or NVMe SSD, and loading only +the active experts to GPU on-demand. + +Memory hierarchy: + GPU VRAM (fastest) → Active experts only (top-K per token) + CPU RAM (fast) → Recently used expert cache + NVMe SSD (slow) → Cold storage for all experts + +Usage: + from open_mythos.expert_offloader import ExpertOffloader + + offloader = ExpertOffloader(model, gpu_experts=4, cache_experts=16) + offloader.prepare() # Move inactive experts to CPU/NVMe + + # During inference, experts are loaded automatically + output = model(input_ids) +""" + +import os +import json +import time +import torch +import torch.nn as nn +from typing import Dict, Optional, List, Set +from collections import OrderedDict +from pathlib import Path +import logging + +logger = logging.getLogger(__name__) + + +class ExpertOffloader: + """ + Manages expert placement across GPU/CPU/NVMe memory hierarchy. + + Keeps only the most recently used experts on GPU, with an LRU cache + for CPU-resident experts and disk storage for cold experts. + + Args: + model: OpenMythos model with MoE layers + gpu_experts: Number of experts to keep on GPU (default: 4) + cache_experts: Number of experts to keep in CPU RAM (default: 16) + storage_dir: Directory for NVMe expert storage (default: /tmp/mythos_experts) + preload_all: If True, load all experts to CPU at init (default: False) + """ + + def __init__( + self, + model: nn.Module, + gpu_experts: int = 4, + cache_experts: int = 16, + storage_dir: str = "/tmp/mythos_experts", + preload_all: bool = False, + ): + self.model = model + self.gpu_experts = gpu_experts + self.cache_experts = cache_experts + self.storage_dir = Path(storage_dir) + self.storage_dir.mkdir(parents=True, exist_ok=True) + + # Expert state tracking + self.expert_states: Dict[str, Dict[int, str]] = {} # layer_name -> {expert_id -> location} + self.gpu_cache: Dict[str, Dict[int, nn.Module]] = {} # layer_name -> {expert_id -> module} + self.cpu_cache: Dict[str, Dict[int, nn.Module]] = {} # layer_name -> {expert_id -> module} + self.lru_order: Dict[str, List[int]] = {} # layer_name -> [expert_ids in LRU order] + + # Statistics + self.stats = { + "gpu_hits": 0, + "cpu_hits": 0, + "disk_loads": 0, + "evictions": 0, + "total_requests": 0, + } + + # Discover MoE layers + self.moe_layers = self._discover_moe_layers() + + if preload_all: + self._preload_to_cpu() + + def _discover_moe_layers(self) -> Dict[str, nn.Module]: + """Find all MoE layers with expert modules.""" + moe_layers = {} + for name, module in self.model.named_modules(): + if hasattr(module, "experts") and hasattr(module, "router"): + moe_layers[name] = module + logger.info(f"Discovered MoE layer: {name} with {len(module.experts)} experts") + return moe_layers + + def _get_expert_module(self, layer_name: str, expert_id: int) -> nn.Module: + """Get the expert module by layer name and expert ID.""" + if layer_name not in self.moe_layers: + raise KeyError(f"MoE layer '{layer_name}' not found. Available: {list(self.moe_layers.keys())}") + layer = self.moe_layers[layer_name] + if not hasattr(layer, "experts"): + raise AttributeError(f"Layer '{layer_name}' has no 'experts' attribute") + if expert_id < 0 or expert_id >= len(layer.experts): + raise IndexError(f"Expert {expert_id} out of range [0, {len(layer.experts)-1}] for layer '{layer_name}'") + return layer.experts[expert_id] + + def _move_expert_to_device( + self, layer_name: str, expert_id: int, device: str + ) -> nn.Module: + """Move an expert to the specified device.""" + expert = self._get_expert_module(layer_name, expert_id) + return expert.to(device) + + def __repr__(self) -> str: + """String representation of the offloader.""" + total_experts = sum(len(m.experts) for m in self.moe_layers.values()) + return ( + f"ExpertOffloader(" + f"moe_layers={len(self.moe_layers)}, " + f"total_experts={total_experts}, " + f"gpu_experts={self.gpu_experts}, " + f"cache_experts={self.cache_experts})" + ) + + def _save_expert_to_disk(self, layer_name: str, expert_id: int): + """Save expert weights to NVMe storage.""" + expert = self._get_expert_module(layer_name, expert_id) + safe_name = layer_name.replace(".", "_") + path = self.storage_dir / f"{safe_name}_expert_{expert_id}.pt" + torch.save( + {k: v.cpu() for k, v in expert.state_dict().items()}, + path, + ) + + def _load_expert_from_disk(self, layer_name: str, expert_id: int) -> nn.Module: + """Load expert weights from NVMe storage.""" + safe_name = layer_name.replace(".", "_") + path = self.storage_dir / f"{safe_name}_expert_{expert_id}.pt" + if not path.exists(): + raise FileNotFoundError(f"Expert storage not found: {path}") + + state_dict = torch.load(path, map_location="cpu", weights_only=True) + expert = self._get_expert_module(layer_name, expert_id) + expert.load_state_dict(state_dict) + return expert + + def _update_lru(self, layer_name: str, expert_id: int): + """Update LRU order for a layer.""" + if layer_name not in self.lru_order: + self.lru_order[layer_name] = [] + + if expert_id in self.lru_order[layer_name]: + self.lru_order[layer_name].remove(expert_id) + self.lru_order[layer_name].append(expert_id) + + def _evict_from_gpu(self, layer_name: str): + """Evict least recently used expert from GPU to CPU.""" + if layer_name not in self.lru_order: + return + + lru = self.lru_order[layer_name] + if len(lru) <= self.gpu_experts: + return + + # Find experts currently on GPU that aren't the most recent + gpu_expert_ids = [ + eid for eid in lru[:-self.gpu_experts] + if self.expert_states.get(layer_name, {}).get(eid) == "gpu" + ] + + if not gpu_expert_ids: + return + + # Evict oldest + evict_id = gpu_expert_ids[0] + self._move_expert_to_device(layer_name, evict_id, "cpu") + + # Move to CPU cache + if layer_name not in self.cpu_cache: + self.cpu_cache[layer_name] = {} + self.cpu_cache[layer_name][evict_id] = self._get_expert_module(layer_name, evict_id) + self.expert_states[layer_name][evict_id] = "cpu" + self.stats["evictions"] += 1 + + logger.debug(f"Evicted expert {evict_id} from GPU to CPU (layer: {layer_name})") + + def prepare(self): + """ + Prepare the model for offloaded inference. + + Moves all experts to CPU initially, keeping only the first + gpu_experts on GPU for each MoE layer. + """ + for layer_name, moe_layer in self.moe_layers.items(): + num_experts = len(moe_layer.experts) + self.expert_states[layer_name] = {} + self.gpu_cache[layer_name] = {} + self.cpu_cache[layer_name] = {} + self.lru_order[layer_name] = [] + + for expert_id in range(num_experts): + if expert_id < self.gpu_experts: + # Keep on GPU + self.expert_states[layer_name][expert_id] = "gpu" + self.gpu_cache[layer_name][expert_id] = self._get_expert_module( + layer_name, expert_id + ) + else: + # Move to CPU + self._move_expert_to_device(layer_name, expert_id, "cpu") + self.expert_states[layer_name][expert_id] = "cpu" + self.cpu_cache[layer_name][expert_id] = self._get_expert_module( + layer_name, expert_id + ) + + self._update_lru(layer_name, expert_id) + + logger.info( + f"Prepared {len(self.moe_layers)} MoE layers for offloaded inference. " + f"GPU experts per layer: {self.gpu_experts}" + ) + + def load_expert(self, layer_name: str, expert_id: int, target_device: str = "cuda"): + """ + Load an expert to the target device, managing cache hierarchy. + + This is called automatically during forward pass when an expert + is selected by the router. + """ + self.stats["total_requests"] += 1 + state = self.expert_states.get(layer_name, {}).get(expert_id) + + if state == "gpu": + # Already on GPU + self.stats["gpu_hits"] += 1 + self._update_lru(layer_name, expert_id) + return + + if state == "cpu": + # In CPU cache, move to GPU + self.stats["cpu_hits"] += 1 + + # Make room on GPU if needed + self._evict_from_gpu(layer_name) + + # Move to GPU + self._move_expert_to_device(layer_name, expert_id, target_device) + self.expert_states[layer_name][expert_id] = "gpu" + self.gpu_cache[layer_name][expert_id] = self._get_expert_module( + layer_name, expert_id + ) + self._update_lru(layer_name, expert_id) + return + + # On disk (cold storage) + self.stats["disk_loads"] += 1 + logger.debug(f"Loading expert {expert_id} from disk (layer: {layer_name})") + + # Load from disk to CPU first + self._load_expert_from_disk(layer_name, expert_id) + + # Then move to GPU + self._evict_from_gpu(layer_name) + self._move_expert_to_device(layer_name, expert_id, target_device) + self.expert_states[layer_name][expert_id] = "gpu" + self.gpu_cache[layer_name][expert_id] = self._get_expert_module( + layer_name, expert_id + ) + self._update_lru(layer_name, expert_id) + + def offload_all_to_cpu(self): + """Move all experts to CPU (for memory cleanup).""" + for layer_name in self.moe_layers: + for expert_id in list(self.expert_states.get(layer_name, {}).keys()): + if self.expert_states[layer_name][expert_id] == "gpu": + self._move_expert_to_device(layer_name, expert_id, "cpu") + self.expert_states[layer_name][expert_id] = "cpu" + torch.cuda.empty_cache() + + def save_all_to_disk(self): + """Save all experts to NVMe storage.""" + for layer_name in self.moe_layers: + num_experts = len(self.moe_layers[layer_name].experts) + for expert_id in range(num_experts): + self._save_expert_to_disk(layer_name, expert_id) + logger.info(f"Saved all experts to {self.storage_dir}") + + def get_stats(self) -> dict: + """Get offloading statistics.""" + total = self.stats["total_requests"] or 1 + return { + **self.stats, + "gpu_hit_rate": self.stats["gpu_hits"] / total * 100, + "cpu_hit_rate": self.stats["cpu_hits"] / total * 100, + "disk_load_rate": self.stats["disk_loads"] / total * 100, + } + + def print_stats(self): + """Print offloading statistics.""" + s = self.get_stats() + print("=" * 50) + print("Expert Offloader Statistics") + print("=" * 50) + print(f"Total requests: {s['total_requests']}") + print(f"GPU hits: {s['gpu_hits']} ({s['gpu_hit_rate']:.1f}%)") + print(f"CPU hits: {s['cpu_hits']} ({s['cpu_hit_rate']:.1f}%)") + print(f"Disk loads: {s['disk_loads']} ({s['disk_load_rate']:.1f}%)") + print(f"GPU evictions: {s['evictions']}") + print("=" * 50) + + +def create_offloaded_model( + model: nn.Module, + gpu_experts: int = 4, + cache_experts: int = 16, + storage_dir: str = "/tmp/mythos_experts", +) -> tuple: + """ + Convenience function to create an offloaded model. + + Returns: + (model, offloader) tuple + """ + offloader = ExpertOffloader( + model, + gpu_experts=gpu_experts, + cache_experts=cache_experts, + storage_dir=storage_dir, + ) + offloader.prepare() + return model, offloader diff --git a/open_mythos/quantization.py b/open_mythos/quantization.py new file mode 100644 index 0000000..9a6b996 --- /dev/null +++ b/open_mythos/quantization.py @@ -0,0 +1,388 @@ +""" +INT4/INT8 Weight Quantization for OpenMythos. + +Supports GPTQ-style and AWQ-style quantization for MoE expert weights. +Enables running mythos_1b on 8GB VRAM and mythos_3b on 12GB VRAM. + +Usage: + from open_mythos.quantization import quantize_model, QuantizedLinear + + # Quantize entire model + model = quantize_model(model, bits=4, group_size=128) + + # Or quantize individual layers + linear = QuantizedLinear(original_linear, bits=4, group_size=128) +""" + +import torch +import torch.nn as nn +import torch.nn.functional as F +from typing import Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class QuantizedLinear(nn.Module): + """ + Memory-efficient quantized linear layer. + + Stores weights in INT4 or INT8 format with per-group scaling factors. + Reduces memory by 4x (INT4) or 2x (INT8) compared to FP16. + + Args: + original_linear: The FP16/FP32 linear layer to quantize + bits: Quantization precision (4 or 8) + group_size: Number of weights sharing a scale factor (default: 128) + """ + + def __init__( + self, + original_linear: nn.Linear, + bits: int = 4, + group_size: int = 128, + ): + super().__init__() + if bits not in (4, 8): + raise ValueError(f"Only INT4 and INT8 supported, got {bits}") + if group_size <= 0: + raise ValueError(f"group_size must be positive, got {group_size}") + if not isinstance(original_linear, nn.Linear): + raise TypeError(f"Expected nn.Linear, got {type(original_linear).__name__}") + + self.bits = bits + self.group_size = group_size + self.in_features = original_linear.in_features + self.out_features = original_linear.out_features + self.has_bias = original_linear.bias is not None + + # Quantize weights + weight = original_linear.weight.data.float() # [out, in] + qweight, scales, zeros = self._quantize_weight(weight) + + # Store quantized weights + if bits == 4: + # Pack two INT4 values into one INT8 + self.register_buffer( + "qweight", + self._pack_int4(qweight).to(torch.int8), + persistent=True, + ) + else: + self.register_buffer( + "qweight", qweight.to(torch.int8), persistent=True + ) + + self.register_buffer("scales", scales.half(), persistent=True) + self.register_buffer("zeros", zeros.half(), persistent=True) + + if self.has_bias: + self.register_buffer( + "bias", original_linear.bias.data.half(), persistent=True + ) + else: + self.bias = None + + def _quantize_weight( + self, weight: torch.Tensor + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Quantize weight tensor to INT4/INT8 with group-wise scaling.""" + out_features, in_features = weight.shape + + # Pad in_features to be divisible by group_size + pad_size = (self.group_size - in_features % self.group_size) % self.group_size + if pad_size > 0: + weight = F.pad(weight, (0, pad_size)) + + _, padded_in = weight.shape + num_groups = padded_in // self.group_size + + # Reshape: [out, num_groups, group_size] + weight_groups = weight.reshape(out_features, num_groups, self.group_size) + + # Per-group min/max + w_min = weight_groups.min(dim=-1, keepdim=True).values # [out, num_groups, 1] + w_max = weight_groups.max(dim=-1, keepdim=True).values # [out, num_groups, 1] + + if self.bits == 4: + qmin, qmax = 0, 15 + else: + qmin, qmax = -127, 127 + + # Scale and zero-point + scales = (w_max - w_min) / (qmax - qmin) + scales = scales.clamp(min=1e-10) # Avoid division by zero + zeros = w_min + + # Quantize + if self.bits == 4: + qweight = ((weight_groups - zeros) / scales).round().clamp(0, 15) + else: + qweight = ((weight_groups - zeros) / scales).round().clamp(-127, 127) + + # Reshape back + qweight = qweight.reshape(out_features, padded_in) + scales = scales.reshape(out_features, num_groups) + zeros = zeros.reshape(out_features, num_groups) + + return qweight, scales, zeros + + def _pack_int4(self, qweight: torch.Tensor) -> torch.Tensor: + """Pack two INT4 values into one INT8 byte.""" + # qweight: [out, in] with values 0-15 + out_features, in_features = qweight.shape + assert in_features % 2 == 0, "in_features must be even for INT4 packing" + + # Reshape to [out, in//2, 2] + qweight = qweight.reshape(out_features, in_features // 2, 2) + # Pack: low nibble + high nibble + packed = (qweight[:, :, 0] | (qweight[:, :, 1] << 4)).to(torch.int8) + return packed + + def _unpack_int4(self, qweight: torch.Tensor) -> torch.Tensor: + """Unpack INT8 byte into two INT4 values.""" + # qweight: [out, in//2] packed + out_features, half_in = qweight.shape + + low = (qweight & 0x0F).to(torch.float32) + high = ((qweight >> 4) & 0x0F).to(torch.float32) + + # Interleave: [out, in] + unpacked = torch.stack([low, high], dim=-1).reshape(out_features, half_in * 2) + return unpacked + + def _dequantize_weight(self) -> torch.Tensor: + """Dequantize weights from INT4/INT8 to float.""" + if self.bits == 4: + qweight_fp = self._unpack_int4(self.qweight) + else: + qweight_fp = self.qweight.float() + + # Apply group-wise dequantization + out_features, in_features = qweight_fp.shape + num_groups = in_features // self.group_size + qweight_groups = qweight_fp.reshape(out_features, num_groups, self.group_size) + + # Dequantize: weight = qweight * scale + zero + dequant = qweight_groups * self.scales.float().unsqueeze(-1) + self.zeros.float().unsqueeze(-1) + weight = dequant.reshape(out_features, in_features) + + # Trim to original size + return weight[:, : self.in_features] + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """Forward pass with on-the-fly dequantization. + + Args: + x: Input tensor of shape (..., in_features) + + Returns: + Output tensor of shape (..., out_features) + """ + weight = self._dequantize_weight().half() + output = F.linear(x.half(), weight) + + if self.has_bias: + output = output + self.bias + + return output + + def __repr__(self) -> str: + """String representation.""" + return ( + f"QuantizedLinear(" + f"in={self.in_features}, " + f"out={self.out_features}, " + f"bits={self.bits}, " + f"group_size={self.group_size}, " + f"bias={self.has_bias})" + ) + + +def quantize_linear_layer( + layer: nn.Linear, + bits: int = 4, + group_size: int = 128, +) -> QuantizedLinear: + """Quantize a single linear layer.""" + return QuantizedLinear(layer, bits=bits, group_size=group_size) + + +def quantize_moe_experts( + moe_layer: nn.Module, + bits: int = 4, + group_size: int = 128, + expert_ids: Optional[list] = None, +) -> nn.Module: + """ + Quantize MoE expert FFN layers. + + Only quantizes the large expert FFN layers (gate_proj, up_proj, down_proj). + Attention and router layers remain in FP16 for accuracy. + + Args: + moe_layer: The MoE module containing experts + bits: Quantization precision (4 or 8) + group_size: Group size for quantization + expert_ids: Specific experts to quantize (None = all) + + Returns: + The modified MoE layer (modifies in-place) + """ + if bits not in (4, 8): + raise ValueError(f"Only INT4 and INT8 supported, got {bits}") + if group_size <= 0: + raise ValueError(f"group_size must be positive, got {group_size}") + quantized_count = 0 + + for name, module in moe_layer.named_modules(): + if not isinstance(module, nn.Linear): + continue + + # Only quantize expert FFN layers + is_expert_ffn = any( + pattern in name + for pattern in ["gate_proj", "up_proj", "down_proj"] + ) + + if not is_expert_ffn: + continue + + # If specific experts requested, filter + if expert_ids is not None: + expert_match = False + for eid in expert_ids: + if f"experts.{eid}." in name or f"expert_{eid}." in name: + expert_match = True + break + if not expert_match: + continue + + # Find parent module and attribute name + parts = name.rsplit(".", 1) + if len(parts) == 2: + parent_name, attr_name = parts + parent = dict(moe_layer.named_modules())[parent_name] + else: + parent = moe_layer + attr_name = name + + # Replace with quantized version + setattr(parent, attr_name, quantize_linear_layer(module, bits, group_size)) + quantized_count += 1 + + return moe_layer + + +def quantize_model( + model: nn.Module, + bits: int = 4, + group_size: int = 128, + quantize_experts_only: bool = True, +) -> nn.Module: + """ + Quantize an OpenMythos model. + + By default, only quantizes MoE expert FFN layers (biggest memory consumers). + Attention layers, embeddings, and router remain in FP16. + + Args: + model: OpenMythos model to quantize + bits: Quantization precision (4 or 8) + group_size: Group size for quantization + quantize_experts_only: If True, only quantize MoE experts (recommended) + + Returns: + Quantized model (modifies in-place) + + Raises: + ValueError: If bits or group_size are invalid + """ + if bits not in (4, 8): + raise ValueError(f"Only INT4 and INT8 supported, got {bits}") + if group_size <= 0: + raise ValueError(f"group_size must be positive, got {group_size}") + + if quantize_experts_only: + # Find MoE layers + moe_found = 0 + for name, module in model.named_modules(): + if hasattr(module, "experts") and hasattr(module, "router"): + quantize_moe_experts(module, bits, group_size) + moe_found += 1 + logger.info(f"Quantized {moe_found} MoE layers to INT{bits} (group_size={group_size})") + else: + # Quantize all linear layers + quantized = 0 + for name, module in model.named_modules(): + if isinstance(module, nn.Linear): + parts = name.rsplit(".", 1) + if len(parts) == 2: + parent_name, attr_name = parts + parent = dict(model.named_modules())[parent_name] + else: + parent = model + attr_name = name + setattr( + parent, attr_name, quantize_linear_layer(module, bits, group_size) + ) + quantized += 1 + logger.info(f"Quantized {quantized} linear layers to INT{bits}") + + return model + + +def get_model_memory_mb(model: nn.Module) -> dict: + """Get memory breakdown of model parameters.""" + total_bytes = 0 + quantized_bytes = 0 + fp16_bytes = 0 + + for param in model.parameters(): + nbytes = param.numel() * param.element_size() + total_bytes += nbytes + + for module in model.modules(): + if isinstance(module, QuantizedLinear): + for param in module.parameters(): + quantized_bytes += param.numel() * param.element_size() + for buf in module.buffers(): + quantized_bytes += buf.numel() * buf.element_size() + elif isinstance(module, nn.Linear): + for param in module.parameters(): + fp16_bytes += param.numel() * param.element_size() + + return { + "total_mb": total_bytes / 1024 / 1024, + "quantized_mb": quantized_bytes / 1024 / 1024, + "fp16_mb": fp16_bytes / 1024 / 1024, + "compression_ratio": fp16_bytes / max(quantized_bytes, 1), + } + + +def print_quantization_summary(model: nn.Module): + """Print a summary of quantization status.""" + q_linear = 0 + fp_linear = 0 + total_params = 0 + quantized_params = 0 + + for module in model.modules(): + if isinstance(module, QuantizedLinear): + q_linear += 1 + quantized_params += module.in_features * module.out_features + elif isinstance(module, nn.Linear): + fp_linear += 1 + total_params += module.weight.numel() + + total_params += quantized_params + + print("=" * 50) + print("OpenMythos Quantization Summary") + print("=" * 50) + print(f"Quantized linear layers: {q_linear}") + print(f"FP16 linear layers: {fp_linear}") + print(f"Total parameters: {total_params:,}") + print(f"Quantized parameters: {quantized_params:,}") + print(f"Quantization ratio: {quantized_params/max(total_params,1)*100:.1f}%") + print("=" * 50) diff --git a/tests/test_quantization.py b/tests/test_quantization.py new file mode 100644 index 0000000..e42a2a7 --- /dev/null +++ b/tests/test_quantization.py @@ -0,0 +1,184 @@ +"""Tests for quantization and expert offloading modules.""" + +import torch +import torch.nn as nn +import pytest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from open_mythos.quantization import ( + QuantizedLinear, + quantize_linear_layer, + quantize_model, + get_model_memory_mb, +) +from open_mythos.expert_offloader import ExpertOffloader + + +class TestQuantizedLinear: + """Tests for QuantizedLinear module.""" + + def test_int4_quantization(self): + """Test INT4 quantization preserves approximate values.""" + linear = nn.Linear(256, 128, bias=True) + linear.weight.data.uniform_(-1, 1) + linear.bias.data.uniform_(-0.1, 0.1) + + ql = QuantizedLinear(linear, bits=4, group_size=64) + + x = torch.randn(1, 16, 256) + out_original = linear(x) + out_quantized = ql(x) + + # INT4 is lossy, but should be in the same ballpark + # Allow up to 20% relative error + rel_error = (out_original - out_quantized).abs() / (out_original.abs() + 1e-6) + assert rel_error.mean() < 0.2, f"Relative error too high: {rel_error.mean():.4f}" + + def test_int8_quantization(self): + """Test INT8 quantization is more accurate than INT4.""" + linear = nn.Linear(256, 128, bias=True) + linear.weight.data.uniform_(-1, 1) + + ql4 = QuantizedLinear(linear, bits=4, group_size=64) + ql8 = QuantizedLinear(linear, bits=8, group_size=64) + + x = torch.randn(1, 16, 256) + out_original = linear(x) + out_int4 = ql4(x) + out_int8 = ql8(x) + + err_int4 = (out_original - out_int4).abs().mean() + err_int8 = (out_original - out_int8).abs().mean() + + # INT8 should be more accurate + assert err_int8 < err_int4, "INT8 should have lower error than INT4" + + def test_memory_reduction(self): + """Test that quantization reduces memory.""" + linear = nn.Linear(1024, 1024, bias=False) + + original_bytes = linear.weight.numel() * 2 # FP16 = 2 bytes + + ql4 = QuantizedLinear(linear, bits=4, group_size=128) + quantized_bytes = sum( + b.numel() * b.element_size() for b in ql4.buffers() + ) + + # INT4 should be ~4x smaller + assert quantized_bytes < original_bytes / 2, ( + f"Quantized ({quantized_bytes}B) should be much smaller than " + f"original ({original_bytes}B)" + ) + + def test_output_shape(self): + """Test output shape is correct.""" + linear = nn.Linear(256, 128, bias=True) + ql = QuantizedLinear(linear, bits=4, group_size=64) + + x = torch.randn(2, 32, 256) + out = ql(x) + + assert out.shape == (2, 32, 128), f"Expected shape (2, 32, 128), got {out.shape}" + + def test_group_size_validation(self): + """Test invalid group_size raises error.""" + linear = nn.Linear(256, 128) + with pytest.raises(AssertionError): + QuantizedLinear(linear, bits=4, group_size=0) + + def test_bits_validation(self): + """Test invalid bits raises error.""" + linear = nn.Linear(256, 128) + with pytest.raises(AssertionError): + QuantizedLinear(linear, bits=3, group_size=64) + + +class TestQuantizeModel: + """Tests for model-level quantization.""" + + def test_quantize_linear_layer(self): + """Test single layer quantization.""" + linear = nn.Linear(256, 128) + ql = quantize_linear_layer(linear, bits=4, group_size=64) + assert isinstance(ql, QuantizedLinear) + + def test_quantize_model_experts_only(self): + """Test model quantization only affects expert layers.""" + # Create a minimal model with MoE-like structure + class FakeMoE(nn.Module): + def __init__(self): + super().__init__() + self.experts = nn.ModuleList([ + nn.Sequential( + nn.Linear(64, 128), + nn.Linear(128, 64), + ) + for _ in range(4) + ]) + self.router = nn.Linear(64, 4) + + class FakeModel(nn.Module): + def __init__(self): + super().__init__() + self.embed = nn.Embedding(100, 64) + self.moe = FakeMoE() + + model = FakeModel() + model = quantize_model(model, bits=4, group_size=32, quantize_experts_only=True) + + # Router should NOT be quantized + assert isinstance(model.moe.router, nn.Linear) + # Expert FFNs should be quantized + for expert in model.moe.experts: + for layer in expert.modules(): + if isinstance(layer, nn.Linear): + assert isinstance(layer, QuantizedLinear) + + +class TestExpertOffloader: + """Tests for ExpertOffloader.""" + + def test_discover_moe_layers(self): + """Test MoE layer discovery.""" + class FakeMoE(nn.Module): + def __init__(self): + super().__init__() + self.experts = nn.ModuleList([nn.Linear(64, 64) for _ in range(8)]) + self.router = nn.Linear(64, 8) + + class FakeModel(nn.Module): + def __init__(self): + super().__init__() + self.moe = FakeMoE() + + model = FakeModel() + offloader = ExpertOffloader(model, gpu_experts=2, cache_experts=4) + + assert "moe" in offloader.moe_layers + + def test_stats_tracking(self): + """Test statistics are tracked.""" + class FakeMoE(nn.Module): + def __init__(self): + super().__init__() + self.experts = nn.ModuleList([nn.Linear(16, 16) for _ in range(4)]) + self.router = nn.Linear(16, 4) + + class FakeModel(nn.Module): + def __init__(self): + super().__init__() + self.moe = FakeMoE() + + model = FakeModel() + offloader = ExpertOffloader(model, gpu_experts=2, cache_experts=4) + + stats = offloader.get_stats() + assert "gpu_hits" in stats + assert "total_requests" in stats + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])