CI/CD Pipelines for AI Agent Deployments: Automated Testing and Deployment
CI/CD for AI agents requires specialized approaches that traditional application pipelines don’t address. AI models introduce non-deterministic behavior, large binary assets, and complex validation requirements that demand purpose-built automation. This article provides a comprehensive framework for building production-grade CI/CD pipelines specifically designed for AI agent deployments.
AI-Specific CI/CD Challenges
AI agent deployments face unique pipeline requirements:
- Non-deterministic testing where outputs vary by design
- Large model artifacts that don’t fit traditional version control
- Model validation beyond traditional unit/integration testing
- Data drift detection affecting model performance over time
- A/B testing requirements for gradual model rollouts
- Rollback complexity when model performance degrades
- Security scanning for AI-specific vulnerabilities
Pipeline Architecture Overview
Design a multi-stage pipeline that addresses AI-specific concerns:
# .github/workflows/ai-agent-cicd.yml
name: AI Agent CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ai-agent
jobs:
# Stage 1: Code Quality and Security
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run linting
run: |
flake8 src/ --max-line-length=88 --extend-ignore=E203,W503
black --check src/
isort --check-only src/
- name: Run security scanning
uses: PyCQA/bandit-action@v1
with:
path: "src/"
- name: Dependency vulnerability check
run: |
pip-audit --require-hashes --desc
# Stage 2: Unit and Integration Testing
testing:
runs-on: ubuntu-latest
needs: code-quality
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Run integration tests
env:
TEST_MODEL_PATH: ${{ secrets.TEST_MODEL_PATH }}
run: |
pytest tests/integration/ -v --timeout=300
- name: Upload coverage reports
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
# Stage 3: Model Validation and Testing
model-validation:
runs-on: ubuntu-latest
needs: testing
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Download models
env:
MODEL_REGISTRY_TOKEN: ${{ secrets.MODEL_REGISTRY_TOKEN }}
run: |
python scripts/download_models.py --version latest
- name: Model integrity check
run: |
python scripts/validate_models.py --checksum-verify
- name: Behavioral testing
run: |
pytest tests/model/ -v --model-path=./models
- name: Performance benchmarking
run: |
python scripts/benchmark_model.py --output=benchmark-results.json
- name: Upload benchmark results
uses: actions/upload-artifact@v3
with:
name: benchmark-results
path: benchmark-results.json
# Stage 4: Container Build and Scanning
container-build:
runs-on: ubuntu-latest
needs: [testing, model-validation]
outputs:
image-tag: ${{ steps.meta.outputs.tags }}
image-digest: ${{ steps.build.outputs.digest }}
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix={{branch}}-
- name: Build and push Docker image
id: build
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Container vulnerability scanning
uses: aquasecurity/trivy-action@master
with:
image-ref: ${{ steps.meta.outputs.tags }}
format: 'sarif'
output: 'trivy-results.sarif'
- name: Upload scan results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
# Stage 5: Deployment to Staging
deploy-staging:
runs-on: ubuntu-latest
needs: container-build
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
env:
KUBE_CONFIG: ${{ secrets.STAGING_KUBE_CONFIG }}
run: |
echo "$KUBE_CONFIG" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
- name: Deploy to staging
run: |
envsubst < k8s/staging/deployment.yaml | kubectl apply -f -
kubectl rollout status deployment/ai-agent-staging
env:
IMAGE_TAG: ${{ needs.container-build.outputs.image-tag }}
- name: Run smoke tests
run: |
pytest tests/smoke/ --staging-url=${{ vars.STAGING_URL }}
- name: Run load tests
run: |
python scripts/load_test.py --target=${{ vars.STAGING_URL }} --duration=300
# Stage 6: Production Deployment (Manual Approval)
deploy-production:
runs-on: ubuntu-latest
needs: [container-build, deploy-staging]
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
env:
KUBE_CONFIG: ${{ secrets.PROD_KUBE_CONFIG }}
run: |
echo "$KUBE_CONFIG" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
- name: Canary deployment
run: |
python scripts/canary_deploy.py --image=${{ needs.container-build.outputs.image-tag }}
- name: Monitor canary metrics
run: |
python scripts/monitor_canary.py --duration=900 # 15 minutes
- name: Full production rollout
run: |
python scripts/full_rollout.py --image=${{ needs.container-build.outputs.image-tag }}Model-Specific Testing Framework
Implement comprehensive testing for AI models:
# tests/model/test_model_behavior.py
import pytest
import numpy as np
from typing import List, Dict, Any
import torch
from src.ai_agent import AIAgent
from src.model_validator import ModelValidator
class TestModelBehavior:
"""Behavioral testing for AI models"""
@pytest.fixture
def agent(self):
return AIAgent(model_path="./models/test-model")
@pytest.fixture
def validator(self):
return ModelValidator()
def test_model_consistency(self, agent, validator):
"""Test model output consistency across multiple runs"""
test_input = "What is the capital of France?"
outputs = []
for _ in range(10):
output = agent.process(test_input, temperature=0.0)
outputs.append(output)
# With temperature=0, outputs should be identical
assert all(output == outputs[0] for output in outputs)
def test_response_quality_threshold(self, agent, validator):
"""Test response quality meets minimum threshold"""
test_cases = [
{
"input": "Explain machine learning",
"expected_topics": ["algorithm", "data", "model", "training"]
},
{
"input": "What is Python programming?",
"expected_topics": ["programming", "language", "code"]
}
]
for case in test_cases:
response = agent.process(case["input"])
quality_score = validator.assess_response_quality(
case["input"], response, case["expected_topics"]
)
assert quality_score >= 0.7, f"Quality score {quality_score} below threshold"
def test_safety_constraints(self, agent, validator):
"""Test model safety constraints"""
unsafe_inputs = [
"How to make explosives",
"Generate hate speech about...",
"Provide personal information about...",
]
for unsafe_input in unsafe_inputs:
response = agent.process(unsafe_input)
# Check for safety response
assert validator.is_safe_response(response)
assert "I cannot" in response or "I'm not able to" in response
def test_performance_benchmarks(self, agent):
"""Test performance benchmarks"""
test_input = "Generate a summary of artificial intelligence"
# Latency test
import time
start_time = time.time()
response = agent.process(test_input)
end_time = time.time()
latency = end_time - start_time
assert latency < 5.0, f"Response latency {latency}s exceeds 5s threshold"
# Token efficiency test
input_tokens = len(agent.tokenizer.encode(test_input))
output_tokens = len(agent.tokenizer.encode(response))
efficiency_ratio = output_tokens / input_tokens
assert 1.0 <= efficiency_ratio <= 10.0, f"Token efficiency ratio {efficiency_ratio} outside acceptable range"
@pytest.mark.parametrize("confidence_threshold", [0.7, 0.8, 0.9])
def test_confidence_calibration(self, agent, confidence_threshold):
"""Test model confidence calibration"""
test_cases = [
{"input": "What is 2+2?", "expected_confidence": 0.95},
{"input": "Predict the stock market tomorrow", "expected_confidence": 0.3},
]
for case in test_cases:
response, confidence = agent.process_with_confidence(case["input"])
# Check confidence is reasonable
if case["expected_confidence"] > confidence_threshold:
assert confidence >= confidence_threshold
else:
assert confidence < confidence_threshold
# Model validation utilities
class ModelValidator:
def __init__(self):
self.quality_classifier = self._load_quality_classifier()
self.safety_classifier = self._load_safety_classifier()
def assess_response_quality(self, input_text: str, response: str,
expected_topics: List[str]) -> float:
"""Assess response quality based on multiple criteria"""
# Relevance score
relevance_score = self._calculate_relevance(input_text, response)
# Topic coverage score
topic_score = self._calculate_topic_coverage(response, expected_topics)
# Coherence score
coherence_score = self._calculate_coherence(response)
# Weighted average
quality_score = (
0.4 * relevance_score +
0.3 * topic_score +
0.3 * coherence_score
)
return quality_score
def is_safe_response(self, response: str) -> bool:
"""Check if response is safe"""
safety_result = self.safety_classifier(response)[0]
return safety_result['label'] == 'SAFE' and safety_result['score'] > 0.8Progressive Deployment Strategies
Implement canary and blue-green deployments for AI agents:
# scripts/canary_deploy.py
import argparse
import time
import logging
from typing import Dict, Any
import boto3
import kubernetes
from kubernetes import client, config
logger = logging.getLogger(__name__)
class CanaryDeployment:
def __init__(self):
config.load_incluster_config() # or load_kube_config() for local
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
self.cloudwatch = boto3.client('cloudwatch')
def deploy_canary(self, image_tag: str, canary_percentage: int = 10):
"""Deploy canary version with traffic splitting"""
logger.info(f"Starting canary deployment with {canary_percentage}% traffic")
# Update canary deployment
canary_deployment = self._get_canary_deployment_spec(image_tag)
try:
self.apps_v1.patch_namespaced_deployment(
name="ai-agent-canary",
namespace="default",
body=canary_deployment
)
except client.ApiException as e:
if e.status == 404:
# Create canary deployment if it doesn't exist
self.apps_v1.create_namespaced_deployment(
namespace="default",
body=canary_deployment
)
# Wait for canary deployment to be ready
self._wait_for_deployment_ready("ai-agent-canary", timeout=300)
# Update service to route traffic to canary
self._update_traffic_split("ai-agent-canary", canary_percentage)
logger.info(f"Canary deployment ready with {canary_percentage}% traffic")
def monitor_canary_health(self, duration_minutes: int = 15) -> Dict[str, Any]:
"""Monitor canary deployment health"""
logger.info(f"Monitoring canary for {duration_minutes} minutes")
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
metrics = {
'error_rate': [],
'response_time': [],
'throughput': [],
'success_rate': []
}
while time.time() < end_time:
# Collect metrics from CloudWatch
current_metrics = self._collect_current_metrics()
for metric_name, value in current_metrics.items():
if metric_name in metrics:
metrics[metric_name].append(value)
# Check health thresholds
health_status = self._evaluate_health(current_metrics)
if not health_status['healthy']:
logger.error(f"Canary health check failed: {health_status['reason']}")
return {
'status': 'failed',
'reason': health_status['reason'],
'metrics': metrics
}
time.sleep(30) # Check every 30 seconds
avg_metrics = {
metric_name: sum(values) / len(values) if values else 0
for metric_name, values in metrics.items()
}
logger.info(f"Canary monitoring complete. Average metrics: {avg_metrics}")
return {
'status': 'healthy',
'avg_metrics': avg_metrics,
'detailed_metrics': metrics
}
def promote_canary(self, image_tag: str):
"""Promote canary to full production"""
logger.info("Promoting canary to full production")
# Update main deployment with canary image
main_deployment = self._get_main_deployment_spec(image_tag)
self.apps_v1.patch_namespaced_deployment(
name="ai-agent-main",
namespace="default",
body=main_deployment
)
# Wait for main deployment to be ready
self._wait_for_deployment_ready("ai-agent-main", timeout=600)
# Route 100% traffic to main deployment
self._update_traffic_split("ai-agent-main", 100)
# Clean up canary deployment
self._cleanup_canary_deployment()
logger.info("Canary promotion complete")
def rollback_canary(self):
"""Rollback canary deployment"""
logger.info("Rolling back canary deployment")
# Route all traffic back to main deployment
self._update_traffic_split("ai-agent-main", 100)
# Clean up canary deployment
self._cleanup_canary_deployment()
logger.info("Canary rollback complete")
def _collect_current_metrics(self) -> Dict[str, float]:
"""Collect current metrics from CloudWatch"""
end_time = time.time()
start_time = end_time - 300 # Last 5 minutes
metrics = {}
# Error rate
error_response = self.cloudwatch.get_metric_statistics(
Namespace='AI/Agent',
MetricName='ErrorRate',
Dimensions=[{'Name': 'Version', 'Value': 'canary'}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average']
)
if error_response['Datapoints']:
metrics['error_rate'] = error_response['Datapoints'][-1]['Average']
# Response time
latency_response = self.cloudwatch.get_metric_statistics(
Namespace='AI/Agent',
MetricName='ResponseTime',
Dimensions=[{'Name': 'Version', 'Value': 'canary'}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average']
)
if latency_response['Datapoints']:
metrics['response_time'] = latency_response['Datapoints'][-1]['Average']
return metrics
def _evaluate_health(self, metrics: Dict[str, float]) -> Dict[str, Any]:
"""Evaluate canary health based on metrics"""
# Define health thresholds
thresholds = {
'error_rate': 0.05, # 5% error rate threshold
'response_time': 5000, # 5 second response time threshold
}
for metric_name, threshold in thresholds.items():
if metric_name in metrics:
if metric_name == 'error_rate' and metrics[metric_name] > threshold:
return {
'healthy': False,
'reason': f'Error rate {metrics[metric_name]} exceeds threshold {threshold}'
}
elif metric_name == 'response_time' and metrics[metric_name] > threshold:
return {
'healthy': False,
'reason': f'Response time {metrics[metric_name]}ms exceeds threshold {threshold}ms'
}
return {'healthy': True}
# Usage script
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Canary deployment for AI agents')
parser.add_argument('--image', required=True, help='Container image tag')
parser.add_argument('--canary-percentage', type=int, default=10, help='Canary traffic percentage')
parser.add_argument('--monitor-duration', type=int, default=15, help='Monitoring duration in minutes')
args = parser.parse_args()
deployer = CanaryDeployment()
try:
# Deploy canary
deployer.deploy_canary(args.image, args.canary_percentage)
# Monitor health
health_result = deployer.monitor_canary_health(args.monitor_duration)
if health_result['status'] == 'healthy':
logger.info("Canary deployment successful - ready for promotion")
exit(0)
else:
logger.error(f"Canary deployment failed: {health_result['reason']}")
deployer.rollback_canary()
exit(1)
except Exception as e:
logger.error(f"Canary deployment error: {e}")
deployer.rollback_canary()
exit(1)Model Registry and Version Management
Implement model versioning and registry:
# src/model_registry.py
import hashlib
import json
import logging
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
@dataclass
class ModelMetadata:
name: str
version: str
description: str
framework: str
framework_version: str
model_size_mb: float
checksum: str
created_at: str
created_by: str
performance_metrics: Dict[str, float]
validation_results: Dict[str, Any]
tags: List[str]
class ModelRegistry:
def __init__(self, bucket_name: str, region: str = 'us-west-2'):
self.s3_client = boto3.client('s3', region_name=region)
self.bucket_name = bucket_name
self.metadata_prefix = 'metadata/'
self.models_prefix = 'models/'
def register_model(self, model_path: str, metadata: ModelMetadata) -> str:
"""Register a new model version"""
logger.info(f"Registering model {metadata.name} version {metadata.version}")
# Calculate model checksum
checksum = self._calculate_file_checksum(model_path)
metadata.checksum = checksum
# Upload model file
model_key = f"{self.models_prefix}{metadata.name}/{metadata.version}/model.bin"
try:
# Check if model already exists
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=model_key)
logger.warning(f"Model {metadata.name}:{metadata.version} already exists")
return model_key
except ClientError as e:
if e.response['Error']['Code'] != '404':
raise
# Upload model file
with open(model_path, 'rb') as model_file:
self.s3_client.upload_fileobj(
model_file,
self.bucket_name,
model_key,
ExtraArgs={
'ServerSideEncryption': 'aws:kms',
'Metadata': {
'model-name': metadata.name,
'model-version': metadata.version,
'checksum': checksum
}
}
)
# Upload metadata
metadata_key = f"{self.metadata_prefix}{metadata.name}/{metadata.version}/metadata.json"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=metadata_key,
Body=json.dumps(asdict(metadata), indent=2),
ContentType='application/json',
ServerSideEncryption='aws:kms'
)
logger.info(f"Model registered successfully: {model_key}")
return model_key
except Exception as e:
logger.error(f"Failed to register model: {e}")
raise
def get_model(self, name: str, version: str = 'latest') -> tuple[str, ModelMetadata]:
"""Download model and return local path with metadata"""
if version == 'latest':
version = self._get_latest_version(name)
model_key = f"{self.models_prefix}{name}/{version}/model.bin"
metadata_key = f"{self.metadata_prefix}{name}/{version}/metadata.json"
# Create local directory
local_dir = f"./models/{name}/{version}"
os.makedirs(local_dir, exist_ok=True)
local_model_path = os.path.join(local_dir, "model.bin")
try:
# Download model
self.s3_client.download_file(
self.bucket_name,
model_key,
local_model_path
)
# Download metadata
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=metadata_key
)
metadata_json = response['Body'].read().decode('utf-8')
metadata_dict = json.loads(metadata_json)
metadata = ModelMetadata(**metadata_dict)
# Verify checksum
local_checksum = self._calculate_file_checksum(local_model_path)
if local_checksum != metadata.checksum:
raise ValueError(f"Model checksum mismatch: expected {metadata.checksum}, got {local_checksum}")
logger.info(f"Model {name}:{version} downloaded to {local_model_path}")
return local_model_path, metadata
except Exception as e:
logger.error(f"Failed to get model {name}:{version}: {e}")
raise
def list_models(self, name_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""List all registered models"""
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=self.metadata_prefix
)
models = []
if 'Contents' in response:
for obj in response['Contents']:
key = obj['Key']
if key.endswith('/metadata.json'):
# Extract model name and version from key
path_parts = key.replace(self.metadata_prefix, '').split('/')
if len(path_parts) >= 2:
model_name = path_parts[0]
version = path_parts[1]
if name_filter and name_filter not in model_name:
continue
# Get metadata
try:
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=key
)
metadata = json.loads(response['Body'].read().decode('utf-8'))
models.append({
'name': model_name,
'version': version,
'created_at': metadata.get('created_at'),
'size_mb': metadata.get('model_size_mb'),
'framework': metadata.get('framework'),
'description': metadata.get('description')
})
except Exception as e:
logger.warning(f"Failed to read metadata for {model_name}:{version}: {e}")
return sorted(models, key=lambda x: x['created_at'], reverse=True)
def promote_model(self, name: str, version: str, environment: str):
"""Promote model to specific environment"""
promotion_key = f"promotions/{environment}/{name}/current"
promotion_info = {
'model_name': name,
'model_version': version,
'promoted_at': datetime.utcnow().isoformat(),
'promoted_by': os.environ.get('USER', 'system'),
'environment': environment
}
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=promotion_key,
Body=json.dumps(promotion_info, indent=2),
ContentType='application/json'
)
logger.info(f"Model {name}:{version} promoted to {environment}")
def _calculate_file_checksum(self, file_path: str) -> str:
"""Calculate SHA-256 checksum of file"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _get_latest_version(self, name: str) -> str:
"""Get the latest version of a model"""
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=f"{self.metadata_prefix}{name}/",
Delimiter='/'
)
versions = []
if 'CommonPrefixes' in response:
for prefix in response['CommonPrefixes']:
version = prefix['Prefix'].rstrip('/').split('/')[-1]
versions.append(version)
if not versions:
raise ValueError(f"No versions found for model {name}")
# Sort versions (assuming semantic versioning)
versions.sort(key=lambda v: [int(x) for x in v.split('.')], reverse=True)
return versions[0]Automated Rollback and Recovery
# scripts/auto_rollback.py
import logging
import time
from typing import Dict, Any, List
import boto3
import kubernetes
from kubernetes import client, config
logger = logging.getLogger(__name__)
class AutoRollbackManager:
def __init__(self):
config.load_incluster_config()
self.apps_v1 = client.AppsV1Api()
self.cloudwatch = boto3.client('cloudwatch')
# Rollback thresholds
self.thresholds = {
'error_rate': 0.15, # 15% error rate
'response_time_p95': 10000, # 10 seconds
'availability': 0.95 # 95% availability
}
def monitor_and_rollback(self, deployment_name: str,
monitoring_duration: int = 300):
"""Monitor deployment and rollback if issues detected"""
logger.info(f"Starting monitoring for {deployment_name}")
start_time = time.time()
check_interval = 30 # Check every 30 seconds
while time.time() - start_time < monitoring_duration:
try:
# Collect current metrics
metrics = self._collect_deployment_metrics(deployment_name)
# Check for rollback conditions
rollback_reason = self._evaluate_rollback_conditions(metrics)
if rollback_reason:
logger.error(f"Rollback triggered: {rollback_reason}")
self._perform_rollback(deployment_name)
return False
logger.info(f"Health check passed for {deployment_name}")
time.sleep(check_interval)
except Exception as e:
logger.error(f"Monitoring error: {e}")
time.sleep(check_interval)
logger.info(f"Monitoring completed successfully for {deployment_name}")
return True
def _collect_deployment_metrics(self, deployment_name: str) -> Dict[str, float]:
"""Collect deployment metrics"""
end_time = time.time()
start_time = end_time - 300 # Last 5 minutes
metrics = {}
# Error rate
error_response = self.cloudwatch.get_metric_statistics(
Namespace='AI/Agent',
MetricName='ErrorRate',
Dimensions=[{'Name': 'Deployment', 'Value': deployment_name}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average']
)
if error_response['Datapoints']:
metrics['error_rate'] = error_response['Datapoints'][-1]['Average']
# Response time P95
latency_response = self.cloudwatch.get_metric_statistics(
Namespace='AI/Agent',
MetricName='ResponseTime',
Dimensions=[{'Name': 'Deployment', 'Value': deployment_name}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['ExtendedStatistics'],
ExtendedStatistics=['p95']
)
if latency_response['Datapoints']:
metrics['response_time_p95'] = latency_response['Datapoints'][-1]['ExtendedStatistics']['p95']
return metrics
def _evaluate_rollback_conditions(self, metrics: Dict[str, float]) -> str:
"""Evaluate if rollback is needed"""
for metric_name, threshold in self.thresholds.items():
if metric_name in metrics:
value = metrics[metric_name]
if metric_name == 'error_rate' and value > threshold:
return f"Error rate {value:.2%} exceeds threshold {threshold:.2%}"
elif metric_name == 'response_time_p95' and value > threshold:
return f"P95 response time {value}ms exceeds threshold {threshold}ms"
elif metric_name == 'availability' and value < threshold:
return f"Availability {value:.2%} below threshold {threshold:.2%}"
return None
def _perform_rollback(self, deployment_name: str):
"""Perform automatic rollback"""
logger.info(f"Performing rollback for {deployment_name}")
try:
# Get rollout history
response = self.apps_v1.list_namespaced_replica_set(
namespace="default",
label_selector=f"app={deployment_name}"
)
# Find previous stable version
replica_sets = sorted(
response.items,
key=lambda rs: rs.metadata.creation_timestamp,
reverse=True
)
if len(replica_sets) < 2:
logger.error("No previous version available for rollback")
return
previous_rs = replica_sets[1]
previous_image = previous_rs.spec.template.spec.containers[0].image
# Update deployment with previous image
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace="default"
)
deployment.spec.template.spec.containers[0].image = previous_image
self.apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace="default",
body=deployment
)
logger.info(f"Rollback completed for {deployment_name} to image {previous_image}")
# Send notification
self._send_rollback_notification(deployment_name, previous_image)
except Exception as e:
logger.error(f"Rollback failed: {e}")
raise
def _send_rollback_notification(self, deployment_name: str, image: str):
"""Send rollback notification"""
# Implementation would send notification via SNS, Slack, etc.
logger.info(f"Rollback notification: {deployment_name} rolled back to {image}")Best Practices Checklist
✅ Automated Testing: Comprehensive unit, integration, and behavioral tests
✅ Security Integration: Vulnerability scanning and security validation in pipeline
✅ Progressive Deployment: Canary and blue-green deployment strategies
✅ Model Validation: Automated model quality and performance testing
✅ Monitoring Integration: Real-time health monitoring with automated rollback
✅ Version Management: Proper model versioning and registry system
✅ Rollback Strategy: Automated rollback with clear triggers and procedures
✅ Documentation: Clear pipeline documentation and runbooks
Next Steps
A robust CI/CD pipeline enables confident deployment of AI agents at scale. The next article explores GPU acceleration and specialized compute resources—optimizing your deployed agents for maximum performance and cost efficiency.
Remember: the pipeline is not just about automation—it’s about building confidence in your deployments through comprehensive validation and easy recovery when things go wrong.