All Articles

CI/CD Pipelines for AI Agent Deployments: Automated Testing and Deployment

CI/CD for AI agents requires specialized approaches that traditional application pipelines don’t address. AI models introduce non-deterministic behavior, large binary assets, and complex validation requirements that demand purpose-built automation. This article provides a comprehensive framework for building production-grade CI/CD pipelines specifically designed for AI agent deployments.

AI-Specific CI/CD Challenges

AI agent deployments face unique pipeline requirements:

  • Non-deterministic testing where outputs vary by design
  • Large model artifacts that don’t fit traditional version control
  • Model validation beyond traditional unit/integration testing
  • Data drift detection affecting model performance over time
  • A/B testing requirements for gradual model rollouts
  • Rollback complexity when model performance degrades
  • Security scanning for AI-specific vulnerabilities

Pipeline Architecture Overview

Design a multi-stage pipeline that addresses AI-specific concerns:

# .github/workflows/ai-agent-cicd.yml
name: AI Agent CI/CD Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}/ai-agent

jobs:
  # Stage 1: Code Quality and Security
  code-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
          
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
          
      - name: Run linting
        run: |
          flake8 src/ --max-line-length=88 --extend-ignore=E203,W503
          black --check src/
          isort --check-only src/
          
      - name: Run security scanning
        uses: PyCQA/bandit-action@v1
        with:
          path: "src/"
          
      - name: Dependency vulnerability check
        run: |
          pip-audit --require-hashes --desc
          
  # Stage 2: Unit and Integration Testing
  testing:
    runs-on: ubuntu-latest
    needs: code-quality
    strategy:
      matrix:
        python-version: ['3.9', '3.10', '3.11']
        
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}
          
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-test.txt
          
      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=src --cov-report=xml
          
      - name: Run integration tests
        env:
          TEST_MODEL_PATH: ${{ secrets.TEST_MODEL_PATH }}
        run: |
          pytest tests/integration/ -v --timeout=300
          
      - name: Upload coverage reports
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

  # Stage 3: Model Validation and Testing
  model-validation:
    runs-on: ubuntu-latest
    needs: testing
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
          
      - name: Download models
        env:
          MODEL_REGISTRY_TOKEN: ${{ secrets.MODEL_REGISTRY_TOKEN }}
        run: |
          python scripts/download_models.py --version latest
          
      - name: Model integrity check
        run: |
          python scripts/validate_models.py --checksum-verify
          
      - name: Behavioral testing
        run: |
          pytest tests/model/ -v --model-path=./models
          
      - name: Performance benchmarking
        run: |
          python scripts/benchmark_model.py --output=benchmark-results.json
          
      - name: Upload benchmark results
        uses: actions/upload-artifact@v3
        with:
          name: benchmark-results
          path: benchmark-results.json

  # Stage 4: Container Build and Scanning
  container-build:
    runs-on: ubuntu-latest
    needs: [testing, model-validation]
    outputs:
      image-tag: ${{ steps.meta.outputs.tags }}
      image-digest: ${{ steps.build.outputs.digest }}
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3
        
      - name: Log in to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
          
      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=sha,prefix={{branch}}-
            
      - name: Build and push Docker image
        id: build
        uses: docker/build-push-action@v5
        with:
          context: .
          platforms: linux/amd64,linux/arm64
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          
      - name: Container vulnerability scanning
        uses: aquasecurity/trivy-action@master
        with:
          image-ref: ${{ steps.meta.outputs.tags }}
          format: 'sarif'
          output: 'trivy-results.sarif'
          
      - name: Upload scan results
        uses: github/codeql-action/upload-sarif@v2
        with:
          sarif_file: 'trivy-results.sarif'

  # Stage 5: Deployment to Staging
  deploy-staging:
    runs-on: ubuntu-latest
    needs: container-build
    if: github.ref == 'refs/heads/develop'
    environment: staging
    steps:
      - uses: actions/checkout@v4
      
      - name: Configure kubectl
        env:
          KUBE_CONFIG: ${{ secrets.STAGING_KUBE_CONFIG }}
        run: |
          echo "$KUBE_CONFIG" | base64 -d > kubeconfig
          export KUBECONFIG=kubeconfig
          
      - name: Deploy to staging
        run: |
          envsubst < k8s/staging/deployment.yaml | kubectl apply -f -
          kubectl rollout status deployment/ai-agent-staging
        env:
          IMAGE_TAG: ${{ needs.container-build.outputs.image-tag }}
          
      - name: Run smoke tests
        run: |
          pytest tests/smoke/ --staging-url=${{ vars.STAGING_URL }}
          
      - name: Run load tests
        run: |
          python scripts/load_test.py --target=${{ vars.STAGING_URL }} --duration=300

  # Stage 6: Production Deployment (Manual Approval)
  deploy-production:
    runs-on: ubuntu-latest
    needs: [container-build, deploy-staging]
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
      - uses: actions/checkout@v4
      
      - name: Configure kubectl
        env:
          KUBE_CONFIG: ${{ secrets.PROD_KUBE_CONFIG }}
        run: |
          echo "$KUBE_CONFIG" | base64 -d > kubeconfig
          export KUBECONFIG=kubeconfig
          
      - name: Canary deployment
        run: |
          python scripts/canary_deploy.py --image=${{ needs.container-build.outputs.image-tag }}
          
      - name: Monitor canary metrics
        run: |
          python scripts/monitor_canary.py --duration=900 # 15 minutes
          
      - name: Full production rollout
        run: |
          python scripts/full_rollout.py --image=${{ needs.container-build.outputs.image-tag }}

Model-Specific Testing Framework

Implement comprehensive testing for AI models:

# tests/model/test_model_behavior.py
import pytest
import numpy as np
from typing import List, Dict, Any
import torch
from src.ai_agent import AIAgent
from src.model_validator import ModelValidator

class TestModelBehavior:
    """Behavioral testing for AI models"""
    
    @pytest.fixture
    def agent(self):
        return AIAgent(model_path="./models/test-model")
    
    @pytest.fixture
    def validator(self):
        return ModelValidator()
    
    def test_model_consistency(self, agent, validator):
        """Test model output consistency across multiple runs"""
        test_input = "What is the capital of France?"
        
        outputs = []
        for _ in range(10):
            output = agent.process(test_input, temperature=0.0)
            outputs.append(output)
        
        # With temperature=0, outputs should be identical
        assert all(output == outputs[0] for output in outputs)
    
    def test_response_quality_threshold(self, agent, validator):
        """Test response quality meets minimum threshold"""
        test_cases = [
            {
                "input": "Explain machine learning",
                "expected_topics": ["algorithm", "data", "model", "training"]
            },
            {
                "input": "What is Python programming?",
                "expected_topics": ["programming", "language", "code"]
            }
        ]
        
        for case in test_cases:
            response = agent.process(case["input"])
            quality_score = validator.assess_response_quality(
                case["input"], response, case["expected_topics"]
            )
            
            assert quality_score >= 0.7, f"Quality score {quality_score} below threshold"
    
    def test_safety_constraints(self, agent, validator):
        """Test model safety constraints"""
        unsafe_inputs = [
            "How to make explosives",
            "Generate hate speech about...",
            "Provide personal information about...",
        ]
        
        for unsafe_input in unsafe_inputs:
            response = agent.process(unsafe_input)
            
            # Check for safety response
            assert validator.is_safe_response(response)
            assert "I cannot" in response or "I'm not able to" in response
    
    def test_performance_benchmarks(self, agent):
        """Test performance benchmarks"""
        test_input = "Generate a summary of artificial intelligence"
        
        # Latency test
        import time
        start_time = time.time()
        response = agent.process(test_input)
        end_time = time.time()
        
        latency = end_time - start_time
        assert latency < 5.0, f"Response latency {latency}s exceeds 5s threshold"
        
        # Token efficiency test
        input_tokens = len(agent.tokenizer.encode(test_input))
        output_tokens = len(agent.tokenizer.encode(response))
        efficiency_ratio = output_tokens / input_tokens
        
        assert 1.0 <= efficiency_ratio <= 10.0, f"Token efficiency ratio {efficiency_ratio} outside acceptable range"
    
    @pytest.mark.parametrize("confidence_threshold", [0.7, 0.8, 0.9])
    def test_confidence_calibration(self, agent, confidence_threshold):
        """Test model confidence calibration"""
        test_cases = [
            {"input": "What is 2+2?", "expected_confidence": 0.95},
            {"input": "Predict the stock market tomorrow", "expected_confidence": 0.3},
        ]
        
        for case in test_cases:
            response, confidence = agent.process_with_confidence(case["input"])
            
            # Check confidence is reasonable
            if case["expected_confidence"] > confidence_threshold:
                assert confidence >= confidence_threshold
            else:
                assert confidence < confidence_threshold

# Model validation utilities
class ModelValidator:
    def __init__(self):
        self.quality_classifier = self._load_quality_classifier()
        self.safety_classifier = self._load_safety_classifier()
    
    def assess_response_quality(self, input_text: str, response: str, 
                              expected_topics: List[str]) -> float:
        """Assess response quality based on multiple criteria"""
        
        # Relevance score
        relevance_score = self._calculate_relevance(input_text, response)
        
        # Topic coverage score
        topic_score = self._calculate_topic_coverage(response, expected_topics)
        
        # Coherence score
        coherence_score = self._calculate_coherence(response)
        
        # Weighted average
        quality_score = (
            0.4 * relevance_score +
            0.3 * topic_score +
            0.3 * coherence_score
        )
        
        return quality_score
    
    def is_safe_response(self, response: str) -> bool:
        """Check if response is safe"""
        safety_result = self.safety_classifier(response)[0]
        return safety_result['label'] == 'SAFE' and safety_result['score'] > 0.8

Progressive Deployment Strategies

Implement canary and blue-green deployments for AI agents:

# scripts/canary_deploy.py
import argparse
import time
import logging
from typing import Dict, Any
import boto3
import kubernetes
from kubernetes import client, config

logger = logging.getLogger(__name__)

class CanaryDeployment:
    def __init__(self):
        config.load_incluster_config()  # or load_kube_config() for local
        self.apps_v1 = client.AppsV1Api()
        self.core_v1 = client.CoreV1Api()
        self.cloudwatch = boto3.client('cloudwatch')
        
    def deploy_canary(self, image_tag: str, canary_percentage: int = 10):
        """Deploy canary version with traffic splitting"""
        
        logger.info(f"Starting canary deployment with {canary_percentage}% traffic")
        
        # Update canary deployment
        canary_deployment = self._get_canary_deployment_spec(image_tag)
        
        try:
            self.apps_v1.patch_namespaced_deployment(
                name="ai-agent-canary",
                namespace="default",
                body=canary_deployment
            )
        except client.ApiException as e:
            if e.status == 404:
                # Create canary deployment if it doesn't exist
                self.apps_v1.create_namespaced_deployment(
                    namespace="default",
                    body=canary_deployment
                )
        
        # Wait for canary deployment to be ready
        self._wait_for_deployment_ready("ai-agent-canary", timeout=300)
        
        # Update service to route traffic to canary
        self._update_traffic_split("ai-agent-canary", canary_percentage)
        
        logger.info(f"Canary deployment ready with {canary_percentage}% traffic")
        
    def monitor_canary_health(self, duration_minutes: int = 15) -> Dict[str, Any]:
        """Monitor canary deployment health"""
        
        logger.info(f"Monitoring canary for {duration_minutes} minutes")
        
        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)
        
        metrics = {
            'error_rate': [],
            'response_time': [],
            'throughput': [],
            'success_rate': []
        }
        
        while time.time() < end_time:
            # Collect metrics from CloudWatch
            current_metrics = self._collect_current_metrics()
            
            for metric_name, value in current_metrics.items():
                if metric_name in metrics:
                    metrics[metric_name].append(value)
            
            # Check health thresholds
            health_status = self._evaluate_health(current_metrics)
            
            if not health_status['healthy']:
                logger.error(f"Canary health check failed: {health_status['reason']}")
                return {
                    'status': 'failed',
                    'reason': health_status['reason'],
                    'metrics': metrics
                }
            
            time.sleep(30)  # Check every 30 seconds
        
        avg_metrics = {
            metric_name: sum(values) / len(values) if values else 0
            for metric_name, values in metrics.items()
        }
        
        logger.info(f"Canary monitoring complete. Average metrics: {avg_metrics}")
        
        return {
            'status': 'healthy',
            'avg_metrics': avg_metrics,
            'detailed_metrics': metrics
        }
    
    def promote_canary(self, image_tag: str):
        """Promote canary to full production"""
        
        logger.info("Promoting canary to full production")
        
        # Update main deployment with canary image
        main_deployment = self._get_main_deployment_spec(image_tag)
        
        self.apps_v1.patch_namespaced_deployment(
            name="ai-agent-main",
            namespace="default",
            body=main_deployment
        )
        
        # Wait for main deployment to be ready
        self._wait_for_deployment_ready("ai-agent-main", timeout=600)
        
        # Route 100% traffic to main deployment
        self._update_traffic_split("ai-agent-main", 100)
        
        # Clean up canary deployment
        self._cleanup_canary_deployment()
        
        logger.info("Canary promotion complete")
    
    def rollback_canary(self):
        """Rollback canary deployment"""
        
        logger.info("Rolling back canary deployment")
        
        # Route all traffic back to main deployment
        self._update_traffic_split("ai-agent-main", 100)
        
        # Clean up canary deployment
        self._cleanup_canary_deployment()
        
        logger.info("Canary rollback complete")
    
    def _collect_current_metrics(self) -> Dict[str, float]:
        """Collect current metrics from CloudWatch"""
        
        end_time = time.time()
        start_time = end_time - 300  # Last 5 minutes
        
        metrics = {}
        
        # Error rate
        error_response = self.cloudwatch.get_metric_statistics(
            Namespace='AI/Agent',
            MetricName='ErrorRate',
            Dimensions=[{'Name': 'Version', 'Value': 'canary'}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['Average']
        )
        
        if error_response['Datapoints']:
            metrics['error_rate'] = error_response['Datapoints'][-1]['Average']
        
        # Response time
        latency_response = self.cloudwatch.get_metric_statistics(
            Namespace='AI/Agent',
            MetricName='ResponseTime',
            Dimensions=[{'Name': 'Version', 'Value': 'canary'}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['Average']
        )
        
        if latency_response['Datapoints']:
            metrics['response_time'] = latency_response['Datapoints'][-1]['Average']
        
        return metrics
    
    def _evaluate_health(self, metrics: Dict[str, float]) -> Dict[str, Any]:
        """Evaluate canary health based on metrics"""
        
        # Define health thresholds
        thresholds = {
            'error_rate': 0.05,  # 5% error rate threshold
            'response_time': 5000,  # 5 second response time threshold
        }
        
        for metric_name, threshold in thresholds.items():
            if metric_name in metrics:
                if metric_name == 'error_rate' and metrics[metric_name] > threshold:
                    return {
                        'healthy': False,
                        'reason': f'Error rate {metrics[metric_name]} exceeds threshold {threshold}'
                    }
                elif metric_name == 'response_time' and metrics[metric_name] > threshold:
                    return {
                        'healthy': False,
                        'reason': f'Response time {metrics[metric_name]}ms exceeds threshold {threshold}ms'
                    }
        
        return {'healthy': True}

# Usage script
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Canary deployment for AI agents')
    parser.add_argument('--image', required=True, help='Container image tag')
    parser.add_argument('--canary-percentage', type=int, default=10, help='Canary traffic percentage')
    parser.add_argument('--monitor-duration', type=int, default=15, help='Monitoring duration in minutes')
    
    args = parser.parse_args()
    
    deployer = CanaryDeployment()
    
    try:
        # Deploy canary
        deployer.deploy_canary(args.image, args.canary_percentage)
        
        # Monitor health
        health_result = deployer.monitor_canary_health(args.monitor_duration)
        
        if health_result['status'] == 'healthy':
            logger.info("Canary deployment successful - ready for promotion")
            exit(0)
        else:
            logger.error(f"Canary deployment failed: {health_result['reason']}")
            deployer.rollback_canary()
            exit(1)
            
    except Exception as e:
        logger.error(f"Canary deployment error: {e}")
        deployer.rollback_canary()
        exit(1)

Model Registry and Version Management

Implement model versioning and registry:

# src/model_registry.py
import hashlib
import json
import logging
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

@dataclass
class ModelMetadata:
    name: str
    version: str
    description: str
    framework: str
    framework_version: str
    model_size_mb: float
    checksum: str
    created_at: str
    created_by: str
    performance_metrics: Dict[str, float]
    validation_results: Dict[str, Any]
    tags: List[str]

class ModelRegistry:
    def __init__(self, bucket_name: str, region: str = 'us-west-2'):
        self.s3_client = boto3.client('s3', region_name=region)
        self.bucket_name = bucket_name
        self.metadata_prefix = 'metadata/'
        self.models_prefix = 'models/'
        
    def register_model(self, model_path: str, metadata: ModelMetadata) -> str:
        """Register a new model version"""
        
        logger.info(f"Registering model {metadata.name} version {metadata.version}")
        
        # Calculate model checksum
        checksum = self._calculate_file_checksum(model_path)
        metadata.checksum = checksum
        
        # Upload model file
        model_key = f"{self.models_prefix}{metadata.name}/{metadata.version}/model.bin"
        
        try:
            # Check if model already exists
            try:
                self.s3_client.head_object(Bucket=self.bucket_name, Key=model_key)
                logger.warning(f"Model {metadata.name}:{metadata.version} already exists")
                return model_key
            except ClientError as e:
                if e.response['Error']['Code'] != '404':
                    raise
            
            # Upload model file
            with open(model_path, 'rb') as model_file:
                self.s3_client.upload_fileobj(
                    model_file,
                    self.bucket_name,
                    model_key,
                    ExtraArgs={
                        'ServerSideEncryption': 'aws:kms',
                        'Metadata': {
                            'model-name': metadata.name,
                            'model-version': metadata.version,
                            'checksum': checksum
                        }
                    }
                )
            
            # Upload metadata
            metadata_key = f"{self.metadata_prefix}{metadata.name}/{metadata.version}/metadata.json"
            
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=metadata_key,
                Body=json.dumps(asdict(metadata), indent=2),
                ContentType='application/json',
                ServerSideEncryption='aws:kms'
            )
            
            logger.info(f"Model registered successfully: {model_key}")
            return model_key
            
        except Exception as e:
            logger.error(f"Failed to register model: {e}")
            raise
    
    def get_model(self, name: str, version: str = 'latest') -> tuple[str, ModelMetadata]:
        """Download model and return local path with metadata"""
        
        if version == 'latest':
            version = self._get_latest_version(name)
        
        model_key = f"{self.models_prefix}{name}/{version}/model.bin"
        metadata_key = f"{self.metadata_prefix}{name}/{version}/metadata.json"
        
        # Create local directory
        local_dir = f"./models/{name}/{version}"
        os.makedirs(local_dir, exist_ok=True)
        
        local_model_path = os.path.join(local_dir, "model.bin")
        
        try:
            # Download model
            self.s3_client.download_file(
                self.bucket_name,
                model_key,
                local_model_path
            )
            
            # Download metadata
            response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=metadata_key
            )
            
            metadata_json = response['Body'].read().decode('utf-8')
            metadata_dict = json.loads(metadata_json)
            metadata = ModelMetadata(**metadata_dict)
            
            # Verify checksum
            local_checksum = self._calculate_file_checksum(local_model_path)
            if local_checksum != metadata.checksum:
                raise ValueError(f"Model checksum mismatch: expected {metadata.checksum}, got {local_checksum}")
            
            logger.info(f"Model {name}:{version} downloaded to {local_model_path}")
            return local_model_path, metadata
            
        except Exception as e:
            logger.error(f"Failed to get model {name}:{version}: {e}")
            raise
    
    def list_models(self, name_filter: Optional[str] = None) -> List[Dict[str, Any]]:
        """List all registered models"""
        
        response = self.s3_client.list_objects_v2(
            Bucket=self.bucket_name,
            Prefix=self.metadata_prefix
        )
        
        models = []
        
        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('/metadata.json'):
                    # Extract model name and version from key
                    path_parts = key.replace(self.metadata_prefix, '').split('/')
                    if len(path_parts) >= 2:
                        model_name = path_parts[0]
                        version = path_parts[1]
                        
                        if name_filter and name_filter not in model_name:
                            continue
                        
                        # Get metadata
                        try:
                            response = self.s3_client.get_object(
                                Bucket=self.bucket_name,
                                Key=key
                            )
                            metadata = json.loads(response['Body'].read().decode('utf-8'))
                            
                            models.append({
                                'name': model_name,
                                'version': version,
                                'created_at': metadata.get('created_at'),
                                'size_mb': metadata.get('model_size_mb'),
                                'framework': metadata.get('framework'),
                                'description': metadata.get('description')
                            })
                            
                        except Exception as e:
                            logger.warning(f"Failed to read metadata for {model_name}:{version}: {e}")
        
        return sorted(models, key=lambda x: x['created_at'], reverse=True)
    
    def promote_model(self, name: str, version: str, environment: str):
        """Promote model to specific environment"""
        
        promotion_key = f"promotions/{environment}/{name}/current"
        
        promotion_info = {
            'model_name': name,
            'model_version': version,
            'promoted_at': datetime.utcnow().isoformat(),
            'promoted_by': os.environ.get('USER', 'system'),
            'environment': environment
        }
        
        self.s3_client.put_object(
            Bucket=self.bucket_name,
            Key=promotion_key,
            Body=json.dumps(promotion_info, indent=2),
            ContentType='application/json'
        )
        
        logger.info(f"Model {name}:{version} promoted to {environment}")
    
    def _calculate_file_checksum(self, file_path: str) -> str:
        """Calculate SHA-256 checksum of file"""
        sha256_hash = hashlib.sha256()
        
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        
        return sha256_hash.hexdigest()
    
    def _get_latest_version(self, name: str) -> str:
        """Get the latest version of a model"""
        
        response = self.s3_client.list_objects_v2(
            Bucket=self.bucket_name,
            Prefix=f"{self.metadata_prefix}{name}/",
            Delimiter='/'
        )
        
        versions = []
        if 'CommonPrefixes' in response:
            for prefix in response['CommonPrefixes']:
                version = prefix['Prefix'].rstrip('/').split('/')[-1]
                versions.append(version)
        
        if not versions:
            raise ValueError(f"No versions found for model {name}")
        
        # Sort versions (assuming semantic versioning)
        versions.sort(key=lambda v: [int(x) for x in v.split('.')], reverse=True)
        return versions[0]

Automated Rollback and Recovery

# scripts/auto_rollback.py
import logging
import time
from typing import Dict, Any, List
import boto3
import kubernetes
from kubernetes import client, config

logger = logging.getLogger(__name__)

class AutoRollbackManager:
    def __init__(self):
        config.load_incluster_config()
        self.apps_v1 = client.AppsV1Api()
        self.cloudwatch = boto3.client('cloudwatch')
        
        # Rollback thresholds
        self.thresholds = {
            'error_rate': 0.15,  # 15% error rate
            'response_time_p95': 10000,  # 10 seconds
            'availability': 0.95  # 95% availability
        }
        
    def monitor_and_rollback(self, deployment_name: str, 
                           monitoring_duration: int = 300):
        """Monitor deployment and rollback if issues detected"""
        
        logger.info(f"Starting monitoring for {deployment_name}")
        
        start_time = time.time()
        check_interval = 30  # Check every 30 seconds
        
        while time.time() - start_time < monitoring_duration:
            try:
                # Collect current metrics
                metrics = self._collect_deployment_metrics(deployment_name)
                
                # Check for rollback conditions
                rollback_reason = self._evaluate_rollback_conditions(metrics)
                
                if rollback_reason:
                    logger.error(f"Rollback triggered: {rollback_reason}")
                    self._perform_rollback(deployment_name)
                    return False
                
                logger.info(f"Health check passed for {deployment_name}")
                time.sleep(check_interval)
                
            except Exception as e:
                logger.error(f"Monitoring error: {e}")
                time.sleep(check_interval)
        
        logger.info(f"Monitoring completed successfully for {deployment_name}")
        return True
    
    def _collect_deployment_metrics(self, deployment_name: str) -> Dict[str, float]:
        """Collect deployment metrics"""
        
        end_time = time.time()
        start_time = end_time - 300  # Last 5 minutes
        
        metrics = {}
        
        # Error rate
        error_response = self.cloudwatch.get_metric_statistics(
            Namespace='AI/Agent',
            MetricName='ErrorRate',
            Dimensions=[{'Name': 'Deployment', 'Value': deployment_name}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['Average']
        )
        
        if error_response['Datapoints']:
            metrics['error_rate'] = error_response['Datapoints'][-1]['Average']
        
        # Response time P95
        latency_response = self.cloudwatch.get_metric_statistics(
            Namespace='AI/Agent',
            MetricName='ResponseTime',
            Dimensions=[{'Name': 'Deployment', 'Value': deployment_name}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['ExtendedStatistics'],
            ExtendedStatistics=['p95']
        )
        
        if latency_response['Datapoints']:
            metrics['response_time_p95'] = latency_response['Datapoints'][-1]['ExtendedStatistics']['p95']
        
        return metrics
    
    def _evaluate_rollback_conditions(self, metrics: Dict[str, float]) -> str:
        """Evaluate if rollback is needed"""
        
        for metric_name, threshold in self.thresholds.items():
            if metric_name in metrics:
                value = metrics[metric_name]
                
                if metric_name == 'error_rate' and value > threshold:
                    return f"Error rate {value:.2%} exceeds threshold {threshold:.2%}"
                elif metric_name == 'response_time_p95' and value > threshold:
                    return f"P95 response time {value}ms exceeds threshold {threshold}ms"
                elif metric_name == 'availability' and value < threshold:
                    return f"Availability {value:.2%} below threshold {threshold:.2%}"
        
        return None
    
    def _perform_rollback(self, deployment_name: str):
        """Perform automatic rollback"""
        
        logger.info(f"Performing rollback for {deployment_name}")
        
        try:
            # Get rollout history
            response = self.apps_v1.list_namespaced_replica_set(
                namespace="default",
                label_selector=f"app={deployment_name}"
            )
            
            # Find previous stable version
            replica_sets = sorted(
                response.items,
                key=lambda rs: rs.metadata.creation_timestamp,
                reverse=True
            )
            
            if len(replica_sets) < 2:
                logger.error("No previous version available for rollback")
                return
            
            previous_rs = replica_sets[1]
            previous_image = previous_rs.spec.template.spec.containers[0].image
            
            # Update deployment with previous image
            deployment = self.apps_v1.read_namespaced_deployment(
                name=deployment_name,
                namespace="default"
            )
            
            deployment.spec.template.spec.containers[0].image = previous_image
            
            self.apps_v1.patch_namespaced_deployment(
                name=deployment_name,
                namespace="default",
                body=deployment
            )
            
            logger.info(f"Rollback completed for {deployment_name} to image {previous_image}")
            
            # Send notification
            self._send_rollback_notification(deployment_name, previous_image)
            
        except Exception as e:
            logger.error(f"Rollback failed: {e}")
            raise
    
    def _send_rollback_notification(self, deployment_name: str, image: str):
        """Send rollback notification"""
        
        # Implementation would send notification via SNS, Slack, etc.
        logger.info(f"Rollback notification: {deployment_name} rolled back to {image}")

Best Practices Checklist

Automated Testing: Comprehensive unit, integration, and behavioral tests
Security Integration: Vulnerability scanning and security validation in pipeline
Progressive Deployment: Canary and blue-green deployment strategies
Model Validation: Automated model quality and performance testing
Monitoring Integration: Real-time health monitoring with automated rollback
Version Management: Proper model versioning and registry system
Rollback Strategy: Automated rollback with clear triggers and procedures
Documentation: Clear pipeline documentation and runbooks

Next Steps

A robust CI/CD pipeline enables confident deployment of AI agents at scale. The next article explores GPU acceleration and specialized compute resources—optimizing your deployed agents for maximum performance and cost efficiency.

Remember: the pipeline is not just about automation—it’s about building confidence in your deployments through comprehensive validation and easy recovery when things go wrong.