All Articles

GPU Acceleration for AI Agents: Performance Optimization in the Cloud

GPU acceleration can dramatically improve AI agent performance, reducing inference latency from seconds to milliseconds and enabling real-time interactive experiences. However, GPU resources in the cloud require specialized configuration, careful cost management, and performance optimization techniques that differ significantly from CPU-based deployments.

GPU vs CPU Performance Characteristics

Understanding when GPU acceleration provides meaningful benefits:

GPU Advantages:

  • Parallel processing: Thousands of cores for matrix operations
  • High memory bandwidth: 900+ GB/s vs 100 GB/s for CPU
  • Optimized for inference: Hardware-accelerated AI operations
  • Batch processing: Efficient handling of multiple simultaneous requests

GPU Considerations:

  • Cost: 3-10x more expensive than equivalent CPU resources
  • Memory limitations: Fixed GPU memory that can’t be dynamically allocated
  • Cold start overhead: GPU initialization adds startup latency
  • Utilization efficiency: Underutilized GPUs waste significant cost

Cloud GPU Options Comparison

Provider Instance Type GPU Memory Cost/Hour Best For
AWS g4dn.xlarge T4 16GB $0.526 Light inference
AWS g5.2xlarge A10G 24GB $1.006 Balanced workloads
AWS p4d.24xlarge A100 40GB $32.77 Heavy training/inference
GCP n1-standard-4-1-tesla-t4 T4 16GB $0.35 Development/testing
GCP a2-highgpu-1g A100 40GB $2.73 Production inference
Azure NC6s_v3 V100 16GB $3.06 Mixed workloads

Container Configuration for GPU Workloads

Properly configure containers for GPU access:

# GPU-optimized Dockerfile
FROM nvidia/cuda:12.1-devel-ubuntu22.04 as builder

# Install Python and build dependencies
RUN apt-get update && apt-get install -y \
    python3.11 \
    python3.11-dev \
    python3-pip \
    build-essential \
    cmake \
    git \
    && rm -rf /var/lib/apt/lists/*

# Set Python 3.11 as default
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.11 1

WORKDIR /build

# Install PyTorch with CUDA support
RUN pip3 install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

# Install other dependencies
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# Production stage
FROM nvidia/cuda:12.1-runtime-ubuntu22.04

# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
    python3.11 \
    python3.11-distutils \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.11/dist-packages /usr/local/lib/python3.11/dist-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Create non-root user
RUN groupadd -r aiuser && useradd -r -g aiuser aiuser

WORKDIR /app

# Copy application code
COPY --chown=aiuser:aiuser src/ ./src/
COPY --chown=aiuser:aiuser models/ ./models/

# Switch to non-root user
USER aiuser

# Set CUDA environment variables
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENV CUDA_VISIBLE_DEVICES=0

# Health check that verifies GPU availability
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD python3 -c "import torch; assert torch.cuda.is_available(); print('GPU available')" || exit 1

EXPOSE 8000

CMD ["python3", "-m", "src.gpu_agent_server"]

Kubernetes GPU Configuration

Configure Kubernetes for GPU workloads:

# GPU node pool configuration (GKE)
apiVersion: v1
kind: ConfigMap
metadata:
  name: gpu-config
data:
  gpu-driver-version: "470.82.01"
  cuda-version: "12.1"

---
# GPU-enabled deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent-gpu
spec:
  replicas: 2
  selector:
    matchLabels:
      app: ai-agent-gpu
  template:
    metadata:
      labels:
        app: ai-agent-gpu
    spec:
      # Node selection for GPU nodes
      nodeSelector:
        accelerator: nvidia-tesla-t4
        
      # Toleration for GPU node taints
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
        
      containers:
      - name: ai-agent
        image: ai-agent:gpu-latest
        
        # GPU resource requests and limits
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
            nvidia.com/gpu: 1
          limits:
            memory: "8Gi"
            cpu: "4"
            nvidia.com/gpu: 1
            
        env:
        - name: CUDA_VISIBLE_DEVICES
          value: "0"
        - name: NVIDIA_VISIBLE_DEVICES
          value: "all"
        - name: CUDA_CACHE_DISABLE
          value: "0"
        - name: CUDA_CACHE_PATH
          value: "/tmp/cuda-cache"
          
        # Volume mounts for GPU drivers and cache
        volumeMounts:
        - name: nvidia-driver
          mountPath: /usr/local/nvidia
          readOnly: true
        - name: cuda-cache
          mountPath: /tmp/cuda-cache
          
        # Readiness probe that checks GPU
        readinessProbe:
          exec:
            command:
            - python3
            - -c
            - "import torch; torch.cuda.is_available()"
          initialDelaySeconds: 30
          periodSeconds: 10
          
      volumes:
      - name: nvidia-driver
        hostPath:
          path: /usr/local/nvidia
      - name: cuda-cache
        emptyDir:
          sizeLimit: 1Gi

---
# Horizontal Pod Autoscaler for GPU workloads
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-agent-gpu-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-agent-gpu
  minReplicas: 1
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: nvidia_gpu_utilization
      target:
        type: AverageValue
        averageValue: "80"
  # Scale down slowly to avoid thrashing
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

GPU-Optimized AI Agent Implementation

Optimize your AI agent code for GPU acceleration:

# src/gpu_agent_server.py
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
import logging
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import psutil
import GPUtil

logger = logging.getLogger(__name__)

@dataclass
class GPUMetrics:
    gpu_utilization: float
    memory_used: float
    memory_total: float
    temperature: float
    power_usage: float

class GPUOptimizedAgent:
    def __init__(self, model_name: str, max_batch_size: int = 8):
        self.model_name = model_name
        self.max_batch_size = max_batch_size
        self.device = self._setup_device()
        
        # Load model and tokenizer
        self.tokenizer, self.model = self._load_model()
        
        # Optimization settings
        self._setup_optimizations()
        
        # Request batching
        self.batch_queue = asyncio.Queue()
        self.batch_processor_task = None
        
        # Metrics tracking
        self.metrics = {
            'requests_processed': 0,
            'batch_size_avg': 0.0,
            'inference_time_avg': 0.0,
            'gpu_utilization_avg': 0.0
        }
        
    def _setup_device(self) -> torch.device:
        """Setup CUDA device with optimizations"""
        if not torch.cuda.is_available():
            raise RuntimeError("CUDA not available")
            
        device_count = torch.cuda.device_count()
        logger.info(f"Found {device_count} CUDA devices")
        
        # Use first GPU for simplicity
        device = torch.device("cuda:0")
        
        # Set device and optimize
        torch.cuda.set_device(device)
        
        # Enable cuDNN auto-tuner for consistent input sizes
        torch.backends.cudnn.benchmark = True
        
        # Enable TensorFloat-32 for A100 GPUs
        if torch.cuda.get_device_capability(device)[0] >= 8:
            torch.backends.cuda.matmul.allow_tf32 = True
            torch.backends.cudnn.allow_tf32 = True
            logger.info("Enabled TF32 for improved performance")
            
        return device
        
    def _load_model(self) -> tuple:
        """Load model with GPU optimizations"""
        logger.info(f"Loading model {self.model_name}")
        
        # Load tokenizer
        tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        tokenizer.pad_token = tokenizer.eos_token
        
        # Load model with optimizations
        model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float16,  # Use half precision for memory efficiency
            device_map="auto",          # Automatic device mapping
            trust_remote_code=True,
            low_cpu_mem_usage=True      # Reduce CPU memory usage during loading
        )
        
        # Compile model for optimization (PyTorch 2.0+)
        if hasattr(torch, 'compile'):
            try:
                model = torch.compile(model, mode="reduce-overhead")
                logger.info("Model compiled with torch.compile")
            except Exception as e:
                logger.warning(f"Model compilation failed: {e}")
        
        model.eval()  # Set to evaluation mode
        
        # Warm up the model
        self._warmup_model(tokenizer, model)
        
        return tokenizer, model
        
    def _setup_optimizations(self):
        """Setup additional GPU optimizations"""
        
        # Pre-allocate GPU memory to avoid fragmentation
        torch.cuda.empty_cache()
        
        # Set memory allocation strategy
        if hasattr(torch.cuda, 'set_per_process_memory_fraction'):
            torch.cuda.set_per_process_memory_fraction(0.9)  # Use 90% of GPU memory
            
        # Enable memory pool
        if hasattr(torch.cuda, 'memory'):
            torch.cuda.memory.set_per_process_memory_fraction(0.9)
            
    def _warmup_model(self, tokenizer, model):
        """Warm up model with sample inputs"""
        logger.info("Warming up model...")
        
        sample_texts = [
            "Hello, how are you?",
            "What is artificial intelligence?",
            "Explain machine learning in simple terms."
        ]
        
        with torch.no_grad():
            for text in sample_texts:
                inputs = tokenizer(
                    text,
                    return_tensors="pt",
                    padding=True,
                    truncation=True,
                    max_length=512
                ).to(self.device)
                
                _ = model.generate(
                    inputs.input_ids,
                    attention_mask=inputs.attention_mask,
                    max_new_tokens=50,
                    do_sample=False,
                    pad_token_id=tokenizer.eos_token_id
                )
                
        torch.cuda.empty_cache()
        logger.info("Model warmup completed")
        
    async def start_batch_processor(self):
        """Start background batch processing"""
        self.batch_processor_task = asyncio.create_task(self._batch_processor())
        
    async def _batch_processor(self):
        """Process requests in batches for GPU efficiency"""
        while True:
            try:
                # Collect requests for batching
                batch = []
                timeout = 0.1  # 100ms timeout for batch collection
                
                # Get first request
                try:
                    request = await asyncio.wait_for(
                        self.batch_queue.get(), 
                        timeout=timeout
                    )
                    batch.append(request)
                except asyncio.TimeoutError:
                    continue
                
                # Collect additional requests up to batch size
                while len(batch) < self.max_batch_size:
                    try:
                        request = await asyncio.wait_for(
                            self.batch_queue.get(), 
                            timeout=0.01  # Very short timeout for additional requests
                        )
                        batch.append(request)
                    except asyncio.TimeoutError:
                        break
                
                # Process batch
                if batch:
                    await self._process_batch(batch)
                    
            except Exception as e:
                logger.error(f"Batch processing error: {e}")
                
    async def _process_batch(self, batch: List[Dict[str, Any]]):
        """Process a batch of requests efficiently"""
        start_time = time.time()
        
        try:
            # Extract texts and futures
            texts = [req['text'] for req in batch]
            futures = [req['future'] for req in batch]
            
            # Tokenize batch
            inputs = self.tokenizer(
                texts,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=512
            ).to(self.device)
            
            # Generate responses
            with torch.no_grad():
                outputs = self.model.generate(
                    inputs.input_ids,
                    attention_mask=inputs.attention_mask,
                    max_new_tokens=150,
                    do_sample=True,
                    temperature=0.7,
                    top_p=0.9,
                    pad_token_id=self.tokenizer.eos_token_id,
                    eos_token_id=self.tokenizer.eos_token_id
                )
            
            # Decode responses
            responses = []
            for i, output in enumerate(outputs):
                # Skip input tokens
                response_tokens = output[inputs.input_ids[i].shape[0]:]
                response = self.tokenizer.decode(
                    response_tokens, 
                    skip_special_tokens=True
                ).strip()
                responses.append(response)
            
            # Set results for futures
            for future, response in zip(futures, responses):
                if not future.cancelled():
                    future.set_result(response)
                    
            # Update metrics
            batch_time = time.time() - start_time
            self._update_metrics(len(batch), batch_time)
            
        except Exception as e:
            logger.error(f"Batch processing failed: {e}")
            # Set exception for all futures
            for req in batch:
                future = req['future']
                if not future.cancelled():
                    future.set_exception(e)
    
    async def process_request(self, text: str) -> str:
        """Process single request through batching system"""
        future = asyncio.Future()
        
        request = {
            'text': text,
            'future': future,
            'timestamp': time.time()
        }
        
        await self.batch_queue.put(request)
        
        try:
            # Wait for result with timeout
            response = await asyncio.wait_for(future, timeout=30.0)
            return response
        except asyncio.TimeoutError:
            logger.error("Request timeout")
            raise
    
    def get_gpu_metrics(self) -> GPUMetrics:
        """Get current GPU metrics"""
        try:
            gpus = GPUtil.getGPUs()
            if gpus:
                gpu = gpus[0]  # First GPU
                return GPUMetrics(
                    gpu_utilization=gpu.load * 100,
                    memory_used=gpu.memoryUsed,
                    memory_total=gpu.memoryTotal,
                    temperature=gpu.temperature,
                    power_usage=getattr(gpu, 'powerDraw', 0)
                )
        except Exception as e:
            logger.warning(f"Failed to get GPU metrics: {e}")
            
        return GPUMetrics(0, 0, 0, 0, 0)
    
    def _update_metrics(self, batch_size: int, batch_time: float):
        """Update performance metrics"""
        self.metrics['requests_processed'] += batch_size
        
        # Running average for batch size
        total_requests = self.metrics['requests_processed']
        self.metrics['batch_size_avg'] = (
            (self.metrics['batch_size_avg'] * (total_requests - batch_size) + 
             batch_size * batch_size) / total_requests
        )
        
        # Running average for inference time per request
        time_per_request = batch_time / batch_size
        self.metrics['inference_time_avg'] = (
            (self.metrics['inference_time_avg'] * (total_requests - batch_size) + 
             time_per_request * batch_size) / total_requests
        )
        
        # Update GPU utilization
        gpu_metrics = self.get_gpu_metrics()
        self.metrics['gpu_utilization_avg'] = (
            (self.metrics['gpu_utilization_avg'] * (total_requests - batch_size) + 
             gpu_metrics.gpu_utilization * batch_size) / total_requests
        )

# Usage example
async def main():
    agent = GPUOptimizedAgent("microsoft/DialoGPT-medium", max_batch_size=4)
    await agent.start_batch_processor()
    
    # Test concurrent requests
    tasks = []
    test_inputs = [
        "Hello, how are you?",
        "What is machine learning?",
        "Explain neural networks",
        "Tell me about AI safety"
    ]
    
    for text in test_inputs:
        task = agent.process_request(text)
        tasks.append(task)
    
    responses = await asyncio.gather(*tasks)
    
    for i, response in enumerate(responses):
        print(f"Response {i+1}: {response}")
    
    # Print metrics
    print(f"Metrics: {agent.metrics}")
    print(f"GPU Metrics: {agent.get_gpu_metrics()}")

if __name__ == "__main__":
    asyncio.run(main())

GPU Memory Management

Optimize GPU memory usage:

# src/gpu_memory_manager.py
import torch
import gc
import logging
from typing import Dict, Any, Optional
from contextlib import contextmanager

logger = logging.getLogger(__name__)

class GPUMemoryManager:
    def __init__(self):
        self.device = torch.cuda.current_device()
        self.initial_memory = torch.cuda.memory_allocated(self.device)
        
    def get_memory_stats(self) -> Dict[str, Any]:
        """Get detailed GPU memory statistics"""
        return {
            'allocated': torch.cuda.memory_allocated(self.device),
            'cached': torch.cuda.memory_reserved(self.device),
            'max_allocated': torch.cuda.max_memory_allocated(self.device),
            'max_cached': torch.cuda.max_memory_reserved(self.device),
        }
    
    def optimize_memory(self):
        """Optimize GPU memory usage"""
        
        # Clear Python garbage collector
        gc.collect()
        
        # Clear PyTorch cache
        torch.cuda.empty_cache()
        
        # Reset peak memory stats
        torch.cuda.reset_peak_memory_stats(self.device)
        
        logger.info("GPU memory optimized")
    
    @contextmanager
    def memory_context(self, operation_name: str = "operation"):
        """Context manager to track memory usage"""
        
        start_memory = torch.cuda.memory_allocated(self.device)
        start_cached = torch.cuda.memory_reserved(self.device)
        
        logger.info(f"Starting {operation_name} - Memory: {start_memory / 1e6:.1f}MB")
        
        try:
            yield
        finally:
            end_memory = torch.cuda.memory_allocated(self.device)
            end_cached = torch.cuda.memory_reserved(self.device)
            
            memory_diff = end_memory - start_memory
            cached_diff = end_cached - start_cached
            
            logger.info(
                f"Finished {operation_name} - "
                f"Memory delta: {memory_diff / 1e6:.1f}MB, "
                f"Cache delta: {cached_diff / 1e6:.1f}MB"
            )
    
    def monitor_memory_usage(self) -> bool:
        """Monitor memory usage and return True if optimization needed"""
        
        stats = self.get_memory_stats()
        memory_usage_ratio = stats['allocated'] / stats['cached']
        
        # If allocated memory is less than 70% of cached, optimization may help
        if memory_usage_ratio < 0.7 and stats['cached'] > 1e9:  # 1GB threshold
            logger.info(
                f"Memory fragmentation detected: "
                f"{memory_usage_ratio:.2%} usage ratio, "
                f"{stats['cached'] / 1e9:.1f}GB cached"
            )
            return True
            
        return False

# Usage in agent
class MemoryOptimizedAgent(GPUOptimizedAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.memory_manager = GPUMemoryManager()
        
    async def _process_batch(self, batch):
        """Process batch with memory management"""
        
        with self.memory_manager.memory_context("batch_processing"):
            await super()._process_batch(batch)
            
            # Check if memory optimization is needed
            if self.memory_manager.monitor_memory_usage():
                self.memory_manager.optimize_memory()

Cost Optimization Strategies

Implement cost-effective GPU scheduling:

# src/gpu_scheduler.py
import asyncio
import logging
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import boto3

logger = logging.getLogger(__name__)

@dataclass
class GPUInstance:
    instance_id: str
    instance_type: str
    state: str
    gpu_type: str
    cost_per_hour: float
    utilization: float
    last_activity: datetime

class GPUScheduler:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.cloudwatch = boto3.client('cloudwatch')
        
        # Cost optimization settings
        self.idle_threshold = 5  # minutes
        self.utilization_threshold = 10  # percent
        self.scale_up_threshold = 80  # percent
        self.scale_down_threshold = 20  # percent
        
    async def optimize_gpu_fleet(self):
        """Continuously optimize GPU fleet for cost"""
        
        while True:
            try:
                # Get current GPU instances
                instances = await self._get_gpu_instances()
                
                # Analyze utilization and make scaling decisions
                await self._analyze_and_scale(instances)
                
                # Sleep before next optimization cycle
                await asyncio.sleep(300)  # 5 minutes
                
            except Exception as e:
                logger.error(f"GPU optimization error: {e}")
                await asyncio.sleep(60)
    
    async def _get_gpu_instances(self) -> List[GPUInstance]:
        """Get current GPU instances with utilization"""
        
        # Get EC2 instances with GPU
        response = self.ec2.describe_instances(
            Filters=[
                {'Name': 'instance-type', 'Values': ['g4dn.*', 'g5.*', 'p3.*', 'p4d.*']},
                {'Name': 'instance-state-name', 'Values': ['running', 'stopped']}
            ]
        )
        
        instances = []
        
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                instance_id = instance['InstanceId']
                instance_type = instance['InstanceType']
                state = instance['State']['Name']
                
                # Get GPU utilization from CloudWatch
                utilization = await self._get_gpu_utilization(instance_id)
                
                # Map instance type to GPU info
                gpu_info = self._get_gpu_info(instance_type)
                
                instances.append(GPUInstance(
                    instance_id=instance_id,
                    instance_type=instance_type,
                    state=state,
                    gpu_type=gpu_info['gpu_type'],
                    cost_per_hour=gpu_info['cost_per_hour'],
                    utilization=utilization,
                    last_activity=datetime.now()  # Would track actual activity
                ))
        
        return instances
    
    async def _get_gpu_utilization(self, instance_id: str) -> float:
        """Get GPU utilization from CloudWatch"""
        
        try:
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(minutes=10)
            
            response = self.cloudwatch.get_metric_statistics(
                Namespace='CWAgent',
                MetricName='nvidia_gpu_utilization',
                Dimensions=[
                    {'Name': 'InstanceId', 'Value': instance_id}
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=300,
                Statistics=['Average']
            )
            
            if response['Datapoints']:
                return response['Datapoints'][-1]['Average']
                
        except Exception as e:
            logger.warning(f"Failed to get GPU utilization for {instance_id}: {e}")
            
        return 0.0
    
    async def _analyze_and_scale(self, instances: List[GPUInstance]):
        """Analyze instances and make scaling decisions"""
        
        running_instances = [i for i in instances if i.state == 'running']
        
        if not running_instances:
            logger.info("No running GPU instances found")
            return
        
        total_utilization = sum(i.utilization for i in running_instances)
        avg_utilization = total_utilization / len(running_instances)
        
        logger.info(f"GPU fleet utilization: {avg_utilization:.1f}% across {len(running_instances)} instances")
        
        # Scale down idle instances
        idle_instances = [
            i for i in running_instances 
            if i.utilization < self.utilization_threshold
        ]
        
        if idle_instances:
            await self._scale_down_idle_instances(idle_instances)
        
        # Scale up if high utilization
        if avg_utilization > self.scale_up_threshold:
            await self._scale_up_fleet()
    
    async def _scale_down_idle_instances(self, idle_instances: List[GPUInstance]):
        """Scale down idle GPU instances"""
        
        for instance in idle_instances:
            logger.info(
                f"Stopping idle GPU instance {instance.instance_id} "
                f"(utilization: {instance.utilization:.1f}%)"
            )
            
            try:
                self.ec2.stop_instances(InstanceIds=[instance.instance_id])
                
                # Send cost savings notification
                await self._send_cost_notification(
                    f"Stopped idle GPU instance {instance.instance_id}, "
                    f"saving ${instance.cost_per_hour:.2f}/hour"
                )
                
            except Exception as e:
                logger.error(f"Failed to stop instance {instance.instance_id}: {e}")
    
    async def _scale_up_fleet(self):
        """Scale up GPU fleet for high demand"""
        
        # Implementation would start additional GPU instances
        # or increase Kubernetes node pool size
        logger.info("High GPU utilization detected - scaling up")
    
    def _get_gpu_info(self, instance_type: str) -> Dict[str, Any]:
        """Get GPU information for instance type"""
        
        gpu_info_map = {
            'g4dn.xlarge': {'gpu_type': 'T4', 'cost_per_hour': 0.526},
            'g4dn.2xlarge': {'gpu_type': 'T4', 'cost_per_hour': 0.752},
            'g5.xlarge': {'gpu_type': 'A10G', 'cost_per_hour': 1.006},
            'g5.2xlarge': {'gpu_type': 'A10G', 'cost_per_hour': 1.212},
            'p3.2xlarge': {'gpu_type': 'V100', 'cost_per_hour': 3.06},
            'p4d.24xlarge': {'gpu_type': 'A100', 'cost_per_hour': 32.77},
        }
        
        return gpu_info_map.get(instance_type, {
            'gpu_type': 'Unknown',
            'cost_per_hour': 0.0
        })
    
    async def _send_cost_notification(self, message: str):
        """Send cost optimization notification"""
        # Implementation would send to SNS, Slack, etc.
        logger.info(f"Cost notification: {message}")

# Spot instance management
class SpotInstanceManager:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        
    def request_spot_instances(self, instance_type: str, 
                             max_price: str, count: int = 1):
        """Request spot instances for GPU workloads"""
        
        try:
            response = self.ec2.request_spot_instances(
                SpotPrice=max_price,
                InstanceCount=count,
                Type='one-time',
                LaunchSpecification={
                    'ImageId': 'ami-0c02fb55956c7d316',  # GPU-optimized AMI
                    'InstanceType': instance_type,
                    'SecurityGroupIds': ['sg-gpu-agents'],
                    'UserData': self._get_user_data_script(),
                    'IamInstanceProfile': {
                        'Name': 'GPU-Agent-Instance-Profile'
                    }
                }
            )
            
            logger.info(f"Requested {count} {instance_type} spot instances")
            return response['SpotInstanceRequests']
            
        except Exception as e:
            logger.error(f"Spot instance request failed: {e}")
            raise
    
    def _get_user_data_script(self) -> str:
        """Get user data script for GPU instance initialization"""
        
        return """#!/bin/bash
        # Install NVIDIA drivers
        apt-get update
        apt-get install -y nvidia-driver-470
        
        # Install Docker with NVIDIA support
        curl -fsSL https://get.docker.com -o get-docker.sh
        sh get-docker.sh
        
        # Install nvidia-docker2
        distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
        curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | apt-key add -
        curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | tee /etc/apt/sources.list.d/nvidia-docker.list
        
        apt-get update
        apt-get install -y nvidia-docker2
        systemctl restart docker
        
        # Pull and start AI agent container
        docker pull your-registry/ai-agent:gpu-latest
        docker run -d --gpus all -p 8000:8000 your-registry/ai-agent:gpu-latest
        """

Performance Monitoring and Optimization

Monitor GPU performance and costs:

# src/gpu_monitoring.py
import psutil
import GPUtil
import time
import logging
from typing import Dict, Any, List
import boto3
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class PerformanceMetrics:
    timestamp: float
    gpu_utilization: float
    gpu_memory_used: float
    gpu_memory_total: float
    gpu_temperature: float
    throughput_requests_per_second: float
    average_latency_ms: float
    cost_per_hour: float
    efficiency_score: float

class GPUPerformanceMonitor:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.metrics_history = []
        
    def collect_metrics(self) -> PerformanceMetrics:
        """Collect comprehensive GPU performance metrics"""
        
        # Get GPU metrics
        gpus = GPUtil.getGPUs()
        gpu = gpus[0] if gpus else None
        
        if gpu:
            gpu_util = gpu.load * 100
            gpu_memory_used = gpu.memoryUsed
            gpu_memory_total = gpu.memoryTotal
            gpu_temp = gpu.temperature
        else:
            gpu_util = gpu_memory_used = gpu_memory_total = gpu_temp = 0
        
        # Calculate efficiency score (requests per dollar)
        cost_per_hour = self._get_instance_cost()
        efficiency_score = self._calculate_efficiency(gpu_util, cost_per_hour)
        
        metrics = PerformanceMetrics(
            timestamp=time.time(),
            gpu_utilization=gpu_util,
            gpu_memory_used=gpu_memory_used,
            gpu_memory_total=gpu_memory_total,
            gpu_temperature=gpu_temp,
            throughput_requests_per_second=0,  # Would be calculated from request logs
            average_latency_ms=0,  # Would be calculated from request logs
            cost_per_hour=cost_per_hour,
            efficiency_score=efficiency_score
        )
        
        self.metrics_history.append(metrics)
        
        # Keep last 1000 metrics
        if len(self.metrics_history) > 1000:
            self.metrics_history = self.metrics_history[-1000:]
        
        return metrics
    
    def _calculate_efficiency(self, gpu_utilization: float, 
                           cost_per_hour: float) -> float:
        """Calculate efficiency score"""
        
        if cost_per_hour == 0:
            return 0
        
        # Simple efficiency: utilization per dollar
        efficiency = gpu_utilization / cost_per_hour
        return efficiency
    
    def _get_instance_cost(self) -> float:
        """Get current instance hourly cost"""
        
        # Would implement logic to determine current instance cost
        # Based on instance type, region, spot vs on-demand, etc.
        return 0.526  # Example: g4dn.xlarge cost
    
    def publish_metrics_to_cloudwatch(self, metrics: PerformanceMetrics):
        """Publish metrics to CloudWatch"""
        
        try:
            metric_data = [
                {
                    'MetricName': 'GPUUtilization',
                    'Value': metrics.gpu_utilization,
                    'Unit': 'Percent',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': 'ai-agent-gpu'}
                    ]
                },
                {
                    'MetricName': 'GPUMemoryUtilization',
                    'Value': (metrics.gpu_memory_used / metrics.gpu_memory_total) * 100 if metrics.gpu_memory_total > 0 else 0,
                    'Unit': 'Percent',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': 'ai-agent-gpu'}
                    ]
                },
                {
                    'MetricName': 'CostEfficiency',
                    'Value': metrics.efficiency_score,
                    'Unit': 'None',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': 'ai-agent-gpu'}
                    ]
                }
            ]
            
            self.cloudwatch.put_metric_data(
                Namespace='AI/GPU/Performance',
                MetricData=metric_data
            )
            
        except Exception as e:
            logger.error(f"Failed to publish metrics: {e}")
    
    def generate_optimization_recommendations(self) -> List[str]:
        """Generate optimization recommendations based on metrics"""
        
        if not self.metrics_history:
            return []
        
        recommendations = []
        recent_metrics = self.metrics_history[-10:]  # Last 10 data points
        
        # Calculate averages
        avg_utilization = sum(m.gpu_utilization for m in recent_metrics) / len(recent_metrics)
        avg_memory_usage = sum(m.gpu_memory_used / m.gpu_memory_total for m in recent_metrics) / len(recent_metrics) * 100
        
        # Low utilization recommendation
        if avg_utilization < 30:
            recommendations.append(
                f"Low GPU utilization ({avg_utilization:.1f}%). "
                "Consider using smaller instance type or implementing request batching."
            )
        
        # High memory usage recommendation  
        if avg_memory_usage > 90:
            recommendations.append(
                f"High GPU memory usage ({avg_memory_usage:.1f}%). "
                "Consider model optimization or larger GPU memory."
            )
        
        # Temperature warning
        recent_temp = recent_metrics[-1].gpu_temperature
        if recent_temp > 85:
            recommendations.append(
                f"High GPU temperature ({recent_temp}°C). "
                "Consider reducing workload or checking cooling."
            )
        
        return recommendations

Best Practices Checklist

Instance Selection: Choose GPU instances based on actual model requirements
Memory Optimization: Use half-precision, model compilation, and memory management
Batch Processing: Implement request batching for GPU efficiency
Cost Management: Use spot instances and auto-scaling for cost optimization
Monitoring: Track GPU utilization, memory, temperature, and costs
Container Optimization: Use NVIDIA base images and proper resource limits
Model Optimization: Use TensorRT, quantization, and model pruning where applicable
Fallback Strategy: Have CPU fallback for GPU failures or unavailability

Next Steps

GPU acceleration can dramatically improve AI agent performance, but requires careful optimization and cost management. The final article in this series brings everything together—comprehensive production monitoring and observability for your fully optimized, cloud-deployed AI agent systems.

Remember: GPU acceleration is not always the answer. Profile your workloads, measure actual performance gains, and optimize for your specific use case and budget constraints.