GPU Acceleration for AI Agents: Performance Optimization in the Cloud
GPU acceleration can dramatically improve AI agent performance, reducing inference latency from seconds to milliseconds and enabling real-time interactive experiences. However, GPU resources in the cloud require specialized configuration, careful cost management, and performance optimization techniques that differ significantly from CPU-based deployments.
GPU vs CPU Performance Characteristics
Understanding when GPU acceleration provides meaningful benefits:
GPU Advantages:
- Parallel processing: Thousands of cores for matrix operations
- High memory bandwidth: 900+ GB/s vs 100 GB/s for CPU
- Optimized for inference: Hardware-accelerated AI operations
- Batch processing: Efficient handling of multiple simultaneous requests
GPU Considerations:
- Cost: 3-10x more expensive than equivalent CPU resources
- Memory limitations: Fixed GPU memory that can’t be dynamically allocated
- Cold start overhead: GPU initialization adds startup latency
- Utilization efficiency: Underutilized GPUs waste significant cost
Cloud GPU Options Comparison
| Provider | Instance Type | GPU | Memory | Cost/Hour | Best For |
|---|---|---|---|---|---|
| AWS | g4dn.xlarge | T4 | 16GB | $0.526 | Light inference |
| AWS | g5.2xlarge | A10G | 24GB | $1.006 | Balanced workloads |
| AWS | p4d.24xlarge | A100 | 40GB | $32.77 | Heavy training/inference |
| GCP | n1-standard-4-1-tesla-t4 | T4 | 16GB | $0.35 | Development/testing |
| GCP | a2-highgpu-1g | A100 | 40GB | $2.73 | Production inference |
| Azure | NC6s_v3 | V100 | 16GB | $3.06 | Mixed workloads |
Container Configuration for GPU Workloads
Properly configure containers for GPU access:
# GPU-optimized Dockerfile
FROM nvidia/cuda:12.1-devel-ubuntu22.04 as builder
# Install Python and build dependencies
RUN apt-get update && apt-get install -y \
python3.11 \
python3.11-dev \
python3-pip \
build-essential \
cmake \
git \
&& rm -rf /var/lib/apt/lists/*
# Set Python 3.11 as default
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.11 1
WORKDIR /build
# Install PyTorch with CUDA support
RUN pip3 install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Install other dependencies
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Production stage
FROM nvidia/cuda:12.1-runtime-ubuntu22.04
# Install runtime dependencies only
RUN apt-get update && apt-get install -y \
python3.11 \
python3.11-distutils \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy installed packages from builder
COPY /usr/local/lib/python3.11/dist-packages /usr/local/lib/python3.11/dist-packages
COPY /usr/local/bin /usr/local/bin
# Create non-root user
RUN groupadd -r aiuser && useradd -r -g aiuser aiuser
WORKDIR /app
# Copy application code
COPY src/ ./src/
COPY models/ ./models/
# Switch to non-root user
USER aiuser
# Set CUDA environment variables
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENV CUDA_VISIBLE_DEVICES=0
# Health check that verifies GPU availability
HEALTHCHECK \
CMD python3 -c "import torch; assert torch.cuda.is_available(); print('GPU available')" || exit 1
EXPOSE 8000
CMD ["python3", "-m", "src.gpu_agent_server"]Kubernetes GPU Configuration
Configure Kubernetes for GPU workloads:
# GPU node pool configuration (GKE)
apiVersion: v1
kind: ConfigMap
metadata:
name: gpu-config
data:
gpu-driver-version: "470.82.01"
cuda-version: "12.1"
---
# GPU-enabled deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-agent-gpu
spec:
replicas: 2
selector:
matchLabels:
app: ai-agent-gpu
template:
metadata:
labels:
app: ai-agent-gpu
spec:
# Node selection for GPU nodes
nodeSelector:
accelerator: nvidia-tesla-t4
# Toleration for GPU node taints
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: ai-agent
image: ai-agent:gpu-latest
# GPU resource requests and limits
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: 1
limits:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: 1
env:
- name: CUDA_VISIBLE_DEVICES
value: "0"
- name: NVIDIA_VISIBLE_DEVICES
value: "all"
- name: CUDA_CACHE_DISABLE
value: "0"
- name: CUDA_CACHE_PATH
value: "/tmp/cuda-cache"
# Volume mounts for GPU drivers and cache
volumeMounts:
- name: nvidia-driver
mountPath: /usr/local/nvidia
readOnly: true
- name: cuda-cache
mountPath: /tmp/cuda-cache
# Readiness probe that checks GPU
readinessProbe:
exec:
command:
- python3
- -c
- "import torch; torch.cuda.is_available()"
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: nvidia-driver
hostPath:
path: /usr/local/nvidia
- name: cuda-cache
emptyDir:
sizeLimit: 1Gi
---
# Horizontal Pod Autoscaler for GPU workloads
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ai-agent-gpu-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ai-agent-gpu
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: nvidia_gpu_utilization
target:
type: AverageValue
averageValue: "80"
# Scale down slowly to avoid thrashing
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60GPU-Optimized AI Agent Implementation
Optimize your AI agent code for GPU acceleration:
# src/gpu_agent_server.py
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
import logging
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import psutil
import GPUtil
logger = logging.getLogger(__name__)
@dataclass
class GPUMetrics:
gpu_utilization: float
memory_used: float
memory_total: float
temperature: float
power_usage: float
class GPUOptimizedAgent:
def __init__(self, model_name: str, max_batch_size: int = 8):
self.model_name = model_name
self.max_batch_size = max_batch_size
self.device = self._setup_device()
# Load model and tokenizer
self.tokenizer, self.model = self._load_model()
# Optimization settings
self._setup_optimizations()
# Request batching
self.batch_queue = asyncio.Queue()
self.batch_processor_task = None
# Metrics tracking
self.metrics = {
'requests_processed': 0,
'batch_size_avg': 0.0,
'inference_time_avg': 0.0,
'gpu_utilization_avg': 0.0
}
def _setup_device(self) -> torch.device:
"""Setup CUDA device with optimizations"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA not available")
device_count = torch.cuda.device_count()
logger.info(f"Found {device_count} CUDA devices")
# Use first GPU for simplicity
device = torch.device("cuda:0")
# Set device and optimize
torch.cuda.set_device(device)
# Enable cuDNN auto-tuner for consistent input sizes
torch.backends.cudnn.benchmark = True
# Enable TensorFloat-32 for A100 GPUs
if torch.cuda.get_device_capability(device)[0] >= 8:
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
logger.info("Enabled TF32 for improved performance")
return device
def _load_model(self) -> tuple:
"""Load model with GPU optimizations"""
logger.info(f"Loading model {self.model_name}")
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
tokenizer.pad_token = tokenizer.eos_token
# Load model with optimizations
model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.float16, # Use half precision for memory efficiency
device_map="auto", # Automatic device mapping
trust_remote_code=True,
low_cpu_mem_usage=True # Reduce CPU memory usage during loading
)
# Compile model for optimization (PyTorch 2.0+)
if hasattr(torch, 'compile'):
try:
model = torch.compile(model, mode="reduce-overhead")
logger.info("Model compiled with torch.compile")
except Exception as e:
logger.warning(f"Model compilation failed: {e}")
model.eval() # Set to evaluation mode
# Warm up the model
self._warmup_model(tokenizer, model)
return tokenizer, model
def _setup_optimizations(self):
"""Setup additional GPU optimizations"""
# Pre-allocate GPU memory to avoid fragmentation
torch.cuda.empty_cache()
# Set memory allocation strategy
if hasattr(torch.cuda, 'set_per_process_memory_fraction'):
torch.cuda.set_per_process_memory_fraction(0.9) # Use 90% of GPU memory
# Enable memory pool
if hasattr(torch.cuda, 'memory'):
torch.cuda.memory.set_per_process_memory_fraction(0.9)
def _warmup_model(self, tokenizer, model):
"""Warm up model with sample inputs"""
logger.info("Warming up model...")
sample_texts = [
"Hello, how are you?",
"What is artificial intelligence?",
"Explain machine learning in simple terms."
]
with torch.no_grad():
for text in sample_texts:
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to(self.device)
_ = model.generate(
inputs.input_ids,
attention_mask=inputs.attention_mask,
max_new_tokens=50,
do_sample=False,
pad_token_id=tokenizer.eos_token_id
)
torch.cuda.empty_cache()
logger.info("Model warmup completed")
async def start_batch_processor(self):
"""Start background batch processing"""
self.batch_processor_task = asyncio.create_task(self._batch_processor())
async def _batch_processor(self):
"""Process requests in batches for GPU efficiency"""
while True:
try:
# Collect requests for batching
batch = []
timeout = 0.1 # 100ms timeout for batch collection
# Get first request
try:
request = await asyncio.wait_for(
self.batch_queue.get(),
timeout=timeout
)
batch.append(request)
except asyncio.TimeoutError:
continue
# Collect additional requests up to batch size
while len(batch) < self.max_batch_size:
try:
request = await asyncio.wait_for(
self.batch_queue.get(),
timeout=0.01 # Very short timeout for additional requests
)
batch.append(request)
except asyncio.TimeoutError:
break
# Process batch
if batch:
await self._process_batch(batch)
except Exception as e:
logger.error(f"Batch processing error: {e}")
async def _process_batch(self, batch: List[Dict[str, Any]]):
"""Process a batch of requests efficiently"""
start_time = time.time()
try:
# Extract texts and futures
texts = [req['text'] for req in batch]
futures = [req['future'] for req in batch]
# Tokenize batch
inputs = self.tokenizer(
texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to(self.device)
# Generate responses
with torch.no_grad():
outputs = self.model.generate(
inputs.input_ids,
attention_mask=inputs.attention_mask,
max_new_tokens=150,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
# Decode responses
responses = []
for i, output in enumerate(outputs):
# Skip input tokens
response_tokens = output[inputs.input_ids[i].shape[0]:]
response = self.tokenizer.decode(
response_tokens,
skip_special_tokens=True
).strip()
responses.append(response)
# Set results for futures
for future, response in zip(futures, responses):
if not future.cancelled():
future.set_result(response)
# Update metrics
batch_time = time.time() - start_time
self._update_metrics(len(batch), batch_time)
except Exception as e:
logger.error(f"Batch processing failed: {e}")
# Set exception for all futures
for req in batch:
future = req['future']
if not future.cancelled():
future.set_exception(e)
async def process_request(self, text: str) -> str:
"""Process single request through batching system"""
future = asyncio.Future()
request = {
'text': text,
'future': future,
'timestamp': time.time()
}
await self.batch_queue.put(request)
try:
# Wait for result with timeout
response = await asyncio.wait_for(future, timeout=30.0)
return response
except asyncio.TimeoutError:
logger.error("Request timeout")
raise
def get_gpu_metrics(self) -> GPUMetrics:
"""Get current GPU metrics"""
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # First GPU
return GPUMetrics(
gpu_utilization=gpu.load * 100,
memory_used=gpu.memoryUsed,
memory_total=gpu.memoryTotal,
temperature=gpu.temperature,
power_usage=getattr(gpu, 'powerDraw', 0)
)
except Exception as e:
logger.warning(f"Failed to get GPU metrics: {e}")
return GPUMetrics(0, 0, 0, 0, 0)
def _update_metrics(self, batch_size: int, batch_time: float):
"""Update performance metrics"""
self.metrics['requests_processed'] += batch_size
# Running average for batch size
total_requests = self.metrics['requests_processed']
self.metrics['batch_size_avg'] = (
(self.metrics['batch_size_avg'] * (total_requests - batch_size) +
batch_size * batch_size) / total_requests
)
# Running average for inference time per request
time_per_request = batch_time / batch_size
self.metrics['inference_time_avg'] = (
(self.metrics['inference_time_avg'] * (total_requests - batch_size) +
time_per_request * batch_size) / total_requests
)
# Update GPU utilization
gpu_metrics = self.get_gpu_metrics()
self.metrics['gpu_utilization_avg'] = (
(self.metrics['gpu_utilization_avg'] * (total_requests - batch_size) +
gpu_metrics.gpu_utilization * batch_size) / total_requests
)
# Usage example
async def main():
agent = GPUOptimizedAgent("microsoft/DialoGPT-medium", max_batch_size=4)
await agent.start_batch_processor()
# Test concurrent requests
tasks = []
test_inputs = [
"Hello, how are you?",
"What is machine learning?",
"Explain neural networks",
"Tell me about AI safety"
]
for text in test_inputs:
task = agent.process_request(text)
tasks.append(task)
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"Response {i+1}: {response}")
# Print metrics
print(f"Metrics: {agent.metrics}")
print(f"GPU Metrics: {agent.get_gpu_metrics()}")
if __name__ == "__main__":
asyncio.run(main())GPU Memory Management
Optimize GPU memory usage:
# src/gpu_memory_manager.py
import torch
import gc
import logging
from typing import Dict, Any, Optional
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class GPUMemoryManager:
def __init__(self):
self.device = torch.cuda.current_device()
self.initial_memory = torch.cuda.memory_allocated(self.device)
def get_memory_stats(self) -> Dict[str, Any]:
"""Get detailed GPU memory statistics"""
return {
'allocated': torch.cuda.memory_allocated(self.device),
'cached': torch.cuda.memory_reserved(self.device),
'max_allocated': torch.cuda.max_memory_allocated(self.device),
'max_cached': torch.cuda.max_memory_reserved(self.device),
}
def optimize_memory(self):
"""Optimize GPU memory usage"""
# Clear Python garbage collector
gc.collect()
# Clear PyTorch cache
torch.cuda.empty_cache()
# Reset peak memory stats
torch.cuda.reset_peak_memory_stats(self.device)
logger.info("GPU memory optimized")
@contextmanager
def memory_context(self, operation_name: str = "operation"):
"""Context manager to track memory usage"""
start_memory = torch.cuda.memory_allocated(self.device)
start_cached = torch.cuda.memory_reserved(self.device)
logger.info(f"Starting {operation_name} - Memory: {start_memory / 1e6:.1f}MB")
try:
yield
finally:
end_memory = torch.cuda.memory_allocated(self.device)
end_cached = torch.cuda.memory_reserved(self.device)
memory_diff = end_memory - start_memory
cached_diff = end_cached - start_cached
logger.info(
f"Finished {operation_name} - "
f"Memory delta: {memory_diff / 1e6:.1f}MB, "
f"Cache delta: {cached_diff / 1e6:.1f}MB"
)
def monitor_memory_usage(self) -> bool:
"""Monitor memory usage and return True if optimization needed"""
stats = self.get_memory_stats()
memory_usage_ratio = stats['allocated'] / stats['cached']
# If allocated memory is less than 70% of cached, optimization may help
if memory_usage_ratio < 0.7 and stats['cached'] > 1e9: # 1GB threshold
logger.info(
f"Memory fragmentation detected: "
f"{memory_usage_ratio:.2%} usage ratio, "
f"{stats['cached'] / 1e9:.1f}GB cached"
)
return True
return False
# Usage in agent
class MemoryOptimizedAgent(GPUOptimizedAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.memory_manager = GPUMemoryManager()
async def _process_batch(self, batch):
"""Process batch with memory management"""
with self.memory_manager.memory_context("batch_processing"):
await super()._process_batch(batch)
# Check if memory optimization is needed
if self.memory_manager.monitor_memory_usage():
self.memory_manager.optimize_memory()Cost Optimization Strategies
Implement cost-effective GPU scheduling:
# src/gpu_scheduler.py
import asyncio
import logging
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import boto3
logger = logging.getLogger(__name__)
@dataclass
class GPUInstance:
instance_id: str
instance_type: str
state: str
gpu_type: str
cost_per_hour: float
utilization: float
last_activity: datetime
class GPUScheduler:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.cloudwatch = boto3.client('cloudwatch')
# Cost optimization settings
self.idle_threshold = 5 # minutes
self.utilization_threshold = 10 # percent
self.scale_up_threshold = 80 # percent
self.scale_down_threshold = 20 # percent
async def optimize_gpu_fleet(self):
"""Continuously optimize GPU fleet for cost"""
while True:
try:
# Get current GPU instances
instances = await self._get_gpu_instances()
# Analyze utilization and make scaling decisions
await self._analyze_and_scale(instances)
# Sleep before next optimization cycle
await asyncio.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"GPU optimization error: {e}")
await asyncio.sleep(60)
async def _get_gpu_instances(self) -> List[GPUInstance]:
"""Get current GPU instances with utilization"""
# Get EC2 instances with GPU
response = self.ec2.describe_instances(
Filters=[
{'Name': 'instance-type', 'Values': ['g4dn.*', 'g5.*', 'p3.*', 'p4d.*']},
{'Name': 'instance-state-name', 'Values': ['running', 'stopped']}
]
)
instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
state = instance['State']['Name']
# Get GPU utilization from CloudWatch
utilization = await self._get_gpu_utilization(instance_id)
# Map instance type to GPU info
gpu_info = self._get_gpu_info(instance_type)
instances.append(GPUInstance(
instance_id=instance_id,
instance_type=instance_type,
state=state,
gpu_type=gpu_info['gpu_type'],
cost_per_hour=gpu_info['cost_per_hour'],
utilization=utilization,
last_activity=datetime.now() # Would track actual activity
))
return instances
async def _get_gpu_utilization(self, instance_id: str) -> float:
"""Get GPU utilization from CloudWatch"""
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=10)
response = self.cloudwatch.get_metric_statistics(
Namespace='CWAgent',
MetricName='nvidia_gpu_utilization',
Dimensions=[
{'Name': 'InstanceId', 'Value': instance_id}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average']
)
if response['Datapoints']:
return response['Datapoints'][-1]['Average']
except Exception as e:
logger.warning(f"Failed to get GPU utilization for {instance_id}: {e}")
return 0.0
async def _analyze_and_scale(self, instances: List[GPUInstance]):
"""Analyze instances and make scaling decisions"""
running_instances = [i for i in instances if i.state == 'running']
if not running_instances:
logger.info("No running GPU instances found")
return
total_utilization = sum(i.utilization for i in running_instances)
avg_utilization = total_utilization / len(running_instances)
logger.info(f"GPU fleet utilization: {avg_utilization:.1f}% across {len(running_instances)} instances")
# Scale down idle instances
idle_instances = [
i for i in running_instances
if i.utilization < self.utilization_threshold
]
if idle_instances:
await self._scale_down_idle_instances(idle_instances)
# Scale up if high utilization
if avg_utilization > self.scale_up_threshold:
await self._scale_up_fleet()
async def _scale_down_idle_instances(self, idle_instances: List[GPUInstance]):
"""Scale down idle GPU instances"""
for instance in idle_instances:
logger.info(
f"Stopping idle GPU instance {instance.instance_id} "
f"(utilization: {instance.utilization:.1f}%)"
)
try:
self.ec2.stop_instances(InstanceIds=[instance.instance_id])
# Send cost savings notification
await self._send_cost_notification(
f"Stopped idle GPU instance {instance.instance_id}, "
f"saving ${instance.cost_per_hour:.2f}/hour"
)
except Exception as e:
logger.error(f"Failed to stop instance {instance.instance_id}: {e}")
async def _scale_up_fleet(self):
"""Scale up GPU fleet for high demand"""
# Implementation would start additional GPU instances
# or increase Kubernetes node pool size
logger.info("High GPU utilization detected - scaling up")
def _get_gpu_info(self, instance_type: str) -> Dict[str, Any]:
"""Get GPU information for instance type"""
gpu_info_map = {
'g4dn.xlarge': {'gpu_type': 'T4', 'cost_per_hour': 0.526},
'g4dn.2xlarge': {'gpu_type': 'T4', 'cost_per_hour': 0.752},
'g5.xlarge': {'gpu_type': 'A10G', 'cost_per_hour': 1.006},
'g5.2xlarge': {'gpu_type': 'A10G', 'cost_per_hour': 1.212},
'p3.2xlarge': {'gpu_type': 'V100', 'cost_per_hour': 3.06},
'p4d.24xlarge': {'gpu_type': 'A100', 'cost_per_hour': 32.77},
}
return gpu_info_map.get(instance_type, {
'gpu_type': 'Unknown',
'cost_per_hour': 0.0
})
async def _send_cost_notification(self, message: str):
"""Send cost optimization notification"""
# Implementation would send to SNS, Slack, etc.
logger.info(f"Cost notification: {message}")
# Spot instance management
class SpotInstanceManager:
def __init__(self):
self.ec2 = boto3.client('ec2')
def request_spot_instances(self, instance_type: str,
max_price: str, count: int = 1):
"""Request spot instances for GPU workloads"""
try:
response = self.ec2.request_spot_instances(
SpotPrice=max_price,
InstanceCount=count,
Type='one-time',
LaunchSpecification={
'ImageId': 'ami-0c02fb55956c7d316', # GPU-optimized AMI
'InstanceType': instance_type,
'SecurityGroupIds': ['sg-gpu-agents'],
'UserData': self._get_user_data_script(),
'IamInstanceProfile': {
'Name': 'GPU-Agent-Instance-Profile'
}
}
)
logger.info(f"Requested {count} {instance_type} spot instances")
return response['SpotInstanceRequests']
except Exception as e:
logger.error(f"Spot instance request failed: {e}")
raise
def _get_user_data_script(self) -> str:
"""Get user data script for GPU instance initialization"""
return """#!/bin/bash
# Install NVIDIA drivers
apt-get update
apt-get install -y nvidia-driver-470
# Install Docker with NVIDIA support
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
# Install nvidia-docker2
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | tee /etc/apt/sources.list.d/nvidia-docker.list
apt-get update
apt-get install -y nvidia-docker2
systemctl restart docker
# Pull and start AI agent container
docker pull your-registry/ai-agent:gpu-latest
docker run -d --gpus all -p 8000:8000 your-registry/ai-agent:gpu-latest
"""Performance Monitoring and Optimization
Monitor GPU performance and costs:
# src/gpu_monitoring.py
import psutil
import GPUtil
import time
import logging
from typing import Dict, Any, List
import boto3
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class PerformanceMetrics:
timestamp: float
gpu_utilization: float
gpu_memory_used: float
gpu_memory_total: float
gpu_temperature: float
throughput_requests_per_second: float
average_latency_ms: float
cost_per_hour: float
efficiency_score: float
class GPUPerformanceMonitor:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.metrics_history = []
def collect_metrics(self) -> PerformanceMetrics:
"""Collect comprehensive GPU performance metrics"""
# Get GPU metrics
gpus = GPUtil.getGPUs()
gpu = gpus[0] if gpus else None
if gpu:
gpu_util = gpu.load * 100
gpu_memory_used = gpu.memoryUsed
gpu_memory_total = gpu.memoryTotal
gpu_temp = gpu.temperature
else:
gpu_util = gpu_memory_used = gpu_memory_total = gpu_temp = 0
# Calculate efficiency score (requests per dollar)
cost_per_hour = self._get_instance_cost()
efficiency_score = self._calculate_efficiency(gpu_util, cost_per_hour)
metrics = PerformanceMetrics(
timestamp=time.time(),
gpu_utilization=gpu_util,
gpu_memory_used=gpu_memory_used,
gpu_memory_total=gpu_memory_total,
gpu_temperature=gpu_temp,
throughput_requests_per_second=0, # Would be calculated from request logs
average_latency_ms=0, # Would be calculated from request logs
cost_per_hour=cost_per_hour,
efficiency_score=efficiency_score
)
self.metrics_history.append(metrics)
# Keep last 1000 metrics
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
return metrics
def _calculate_efficiency(self, gpu_utilization: float,
cost_per_hour: float) -> float:
"""Calculate efficiency score"""
if cost_per_hour == 0:
return 0
# Simple efficiency: utilization per dollar
efficiency = gpu_utilization / cost_per_hour
return efficiency
def _get_instance_cost(self) -> float:
"""Get current instance hourly cost"""
# Would implement logic to determine current instance cost
# Based on instance type, region, spot vs on-demand, etc.
return 0.526 # Example: g4dn.xlarge cost
def publish_metrics_to_cloudwatch(self, metrics: PerformanceMetrics):
"""Publish metrics to CloudWatch"""
try:
metric_data = [
{
'MetricName': 'GPUUtilization',
'Value': metrics.gpu_utilization,
'Unit': 'Percent',
'Dimensions': [
{'Name': 'Service', 'Value': 'ai-agent-gpu'}
]
},
{
'MetricName': 'GPUMemoryUtilization',
'Value': (metrics.gpu_memory_used / metrics.gpu_memory_total) * 100 if metrics.gpu_memory_total > 0 else 0,
'Unit': 'Percent',
'Dimensions': [
{'Name': 'Service', 'Value': 'ai-agent-gpu'}
]
},
{
'MetricName': 'CostEfficiency',
'Value': metrics.efficiency_score,
'Unit': 'None',
'Dimensions': [
{'Name': 'Service', 'Value': 'ai-agent-gpu'}
]
}
]
self.cloudwatch.put_metric_data(
Namespace='AI/GPU/Performance',
MetricData=metric_data
)
except Exception as e:
logger.error(f"Failed to publish metrics: {e}")
def generate_optimization_recommendations(self) -> List[str]:
"""Generate optimization recommendations based on metrics"""
if not self.metrics_history:
return []
recommendations = []
recent_metrics = self.metrics_history[-10:] # Last 10 data points
# Calculate averages
avg_utilization = sum(m.gpu_utilization for m in recent_metrics) / len(recent_metrics)
avg_memory_usage = sum(m.gpu_memory_used / m.gpu_memory_total for m in recent_metrics) / len(recent_metrics) * 100
# Low utilization recommendation
if avg_utilization < 30:
recommendations.append(
f"Low GPU utilization ({avg_utilization:.1f}%). "
"Consider using smaller instance type or implementing request batching."
)
# High memory usage recommendation
if avg_memory_usage > 90:
recommendations.append(
f"High GPU memory usage ({avg_memory_usage:.1f}%). "
"Consider model optimization or larger GPU memory."
)
# Temperature warning
recent_temp = recent_metrics[-1].gpu_temperature
if recent_temp > 85:
recommendations.append(
f"High GPU temperature ({recent_temp}°C). "
"Consider reducing workload or checking cooling."
)
return recommendationsBest Practices Checklist
✅ Instance Selection: Choose GPU instances based on actual model requirements
✅ Memory Optimization: Use half-precision, model compilation, and memory management
✅ Batch Processing: Implement request batching for GPU efficiency
✅ Cost Management: Use spot instances and auto-scaling for cost optimization
✅ Monitoring: Track GPU utilization, memory, temperature, and costs
✅ Container Optimization: Use NVIDIA base images and proper resource limits
✅ Model Optimization: Use TensorRT, quantization, and model pruning where applicable
✅ Fallback Strategy: Have CPU fallback for GPU failures or unavailability
Next Steps
GPU acceleration can dramatically improve AI agent performance, but requires careful optimization and cost management. The final article in this series brings everything together—comprehensive production monitoring and observability for your fully optimized, cloud-deployed AI agent systems.
Remember: GPU acceleration is not always the answer. Profile your workloads, measure actual performance gains, and optimize for your specific use case and budget constraints.