Production Best Practices: Deploying Reliable AI Agents
Deploying AI agents to production requires a different mindset than traditional software. Non-deterministic behavior, evolving models, and user safety considerations demand specialized approaches to testing, deployment, and maintenance. This final article in our series provides a comprehensive production deployment framework.
Testing Strategies for Non-Deterministic Systems
Traditional unit tests fall short when outputs vary by design. AI agent testing requires probabilistic approaches and behavioral validation.
1. Behavioral Testing Framework
import pytest
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class BehaviorExpectation:
name: str
validator: Callable[[str], bool]
confidence_threshold: float = 0.8
sample_size: int = 10
class AgentBehaviorTester:
def __init__(self, agent):
self.agent = agent
async def test_behavior_consistency(self,
input_prompt: str,
expectations: List[BehaviorExpectation],
iterations: int = 20) -> dict:
"""Test behavioral consistency across multiple runs"""
results = {}
for expectation in expectations:
successes = 0
responses = []
for i in range(iterations):
response = await self.agent.process(input_prompt)
responses.append(response)
if expectation.validator(response):
successes += 1
success_rate = successes / iterations
results[expectation.name] = {
'success_rate': success_rate,
'passes_threshold': success_rate >= expectation.confidence_threshold,
'sample_responses': responses[:3], # Store sample responses
'expectation': expectation
}
return results
# Example usage
def test_customer_service_agent():
agent = CustomerServiceAgent()
tester = AgentBehaviorTester(agent)
expectations = [
BehaviorExpectation(
name="provides_helpful_response",
validator=lambda r: "help" in r.lower() and len(r) > 50
),
BehaviorExpectation(
name="maintains_professional_tone",
validator=lambda r: not any(word in r.lower() for word in ["stupid", "dumb", "idiot"])
),
BehaviorExpectation(
name="includes_contact_info",
validator=lambda r: "contact" in r.lower() or "reach out" in r.lower()
)
]
results = await tester.test_behavior_consistency(
"I'm having trouble with my order",
expectations
)
# Assert that all expectations meet confidence thresholds
for expectation_name, result in results.items():
assert result['passes_threshold'], f"Failed expectation: {expectation_name}"2. Quality Regression Testing
class QualityRegressionSuite:
def __init__(self, baseline_model: str, new_model: str):
self.baseline_agent = Agent(model=baseline_model)
self.new_agent = Agent(model=new_model)
self.test_cases = self.load_test_cases()
async def run_regression_tests(self) -> dict:
"""Compare new model against baseline on quality metrics"""
results = {
'baseline_scores': [],
'new_model_scores': [],
'degraded_cases': [],
'improved_cases': []
}
for test_case in self.test_cases:
# Get responses from both models
baseline_response = await self.baseline_agent.process(test_case.input)
new_response = await self.new_agent.process(test_case.input)
# Evaluate quality
baseline_score = await self.evaluate_quality(
test_case.input, baseline_response, test_case.expected_output
)
new_score = await self.evaluate_quality(
test_case.input, new_response, test_case.expected_output
)
results['baseline_scores'].append(baseline_score)
results['new_model_scores'].append(new_score)
# Track significant changes
score_diff = new_score - baseline_score
if score_diff < -0.1: # Significant degradation
results['degraded_cases'].append({
'test_case': test_case,
'baseline_score': baseline_score,
'new_score': new_score,
'difference': score_diff
})
elif score_diff > 0.1: # Significant improvement
results['improved_cases'].append({
'test_case': test_case,
'baseline_score': baseline_score,
'new_score': new_score,
'difference': score_diff
})
# Calculate overall statistics
results['summary'] = self._calculate_summary(results)
return results
def _calculate_summary(self, results: dict) -> dict:
baseline_avg = sum(results['baseline_scores']) / len(results['baseline_scores'])
new_avg = sum(results['new_model_scores']) / len(results['new_model_scores'])
return {
'baseline_average': baseline_avg,
'new_model_average': new_avg,
'overall_improvement': new_avg - baseline_avg,
'degraded_count': len(results['degraded_cases']),
'improved_count': len(results['improved_cases']),
'regression_threshold_passed': len(results['degraded_cases']) < 5 # Max 5 degraded cases
}3. A/B Testing for Gradual Rollouts
import random
from enum import Enum
class RolloutStrategy(Enum):
PERCENTAGE = "percentage"
USER_COHORT = "user_cohort"
FEATURE_FLAG = "feature_flag"
class ABTestingFramework:
def __init__(self):
self.experiments = {}
self.metrics_collector = MetricsCollector()
def create_experiment(self, experiment_id: str,
control_agent: Agent,
treatment_agent: Agent,
rollout_strategy: RolloutStrategy,
rollout_percentage: float = 5.0):
"""Create new A/B test experiment"""
self.experiments[experiment_id] = {
'control_agent': control_agent,
'treatment_agent': treatment_agent,
'strategy': rollout_strategy,
'percentage': rollout_percentage,
'start_time': time.time(),
'metrics': {
'control': {'requests': 0, 'successes': 0, 'avg_quality': 0},
'treatment': {'requests': 0, 'successes': 0, 'avg_quality': 0}
}
}
async def route_request(self, experiment_id: str,
request_data: dict) -> tuple[str, str]:
"""Route request to control or treatment group"""
experiment = self.experiments[experiment_id]
# Determine assignment
if self._should_use_treatment(experiment, request_data):
agent = experiment['treatment_agent']
group = 'treatment'
else:
agent = experiment['control_agent']
group = 'control'
# Process request and collect metrics
response = await agent.process(request_data['input'])
await self._record_experiment_metrics(
experiment_id, group, request_data, response
)
return group, response
def _should_use_treatment(self, experiment: dict, request_data: dict) -> bool:
"""Determine if request should go to treatment group"""
if experiment['strategy'] == RolloutStrategy.PERCENTAGE:
return random.random() < (experiment['percentage'] / 100)
elif experiment['strategy'] == RolloutStrategy.USER_COHORT:
user_id = request_data.get('user_id', '')
# Consistent assignment based on user ID hash
return hash(user_id) % 100 < experiment['percentage']
elif experiment['strategy'] == RolloutStrategy.FEATURE_FLAG:
# Check feature flag service
return self.feature_flag_service.is_enabled(
f"experiment_{experiment['id']}",
request_data.get('user_id')
)
return False
def analyze_experiment_results(self, experiment_id: str) -> dict:
"""Analyze A/B test results and provide recommendations"""
experiment = self.experiments[experiment_id]
control_metrics = experiment['metrics']['control']
treatment_metrics = experiment['metrics']['treatment']
# Statistical significance testing
significance_result = self._calculate_statistical_significance(
control_metrics, treatment_metrics
)
analysis = {
'experiment_id': experiment_id,
'duration_hours': (time.time() - experiment['start_time']) / 3600,
'control_performance': control_metrics,
'treatment_performance': treatment_metrics,
'statistical_significance': significance_result,
'recommendation': self._generate_recommendation(
control_metrics, treatment_metrics, significance_result
)
}
return analysisSecurity Considerations
1. Prompt Injection Prevention
import re
from typing import List
class PromptInjectionDetector:
def __init__(self):
# Common injection patterns
self.injection_patterns = [
r"ignore\s+previous\s+instructions",
r"forget\s+everything",
r"new\s+instructions?:",
r"system\s*:\s*you\s+are",
r"act\s+as\s+if\s+you\s+are",
r"pretend\s+to\s+be",
r"roleplay\s+as",
]
self.compiled_patterns = [re.compile(pattern, re.IGNORECASE)
for pattern in self.injection_patterns]
def detect_injection(self, user_input: str) -> dict:
"""Detect potential prompt injection attempts"""
detected_patterns = []
risk_score = 0.0
# Check for known injection patterns
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(user_input):
detected_patterns.append(self.injection_patterns[i])
risk_score += 0.3
# Additional heuristics
if self._contains_system_keywords(user_input):
risk_score += 0.2
if self._has_unusual_formatting(user_input):
risk_score += 0.1
if self._contains_code_injection(user_input):
risk_score += 0.4
return {
'is_potential_injection': risk_score > 0.5,
'risk_score': min(risk_score, 1.0),
'detected_patterns': detected_patterns,
'mitigation_applied': risk_score > 0.7
}
def sanitize_input(self, user_input: str, detection_result: dict) -> str:
"""Sanitize input based on detection results"""
if not detection_result['is_potential_injection']:
return user_input
sanitized = user_input
# Remove detected patterns
for pattern in self.compiled_patterns:
sanitized = pattern.sub('[REMOVED]', sanitized)
# Add safety prefix
if detection_result['mitigation_applied']:
safety_prefix = "Please respond to the following user query only: "
sanitized = safety_prefix + sanitized
return sanitized
class SecureAgentWrapper:
def __init__(self, base_agent: Agent):
self.base_agent = base_agent
self.injection_detector = PromptInjectionDetector()
self.output_validator = OutputValidator()
async def process_securely(self, user_input: str, **kwargs) -> str:
"""Process user input with security checks"""
# Input validation and sanitization
detection_result = self.injection_detector.detect_injection(user_input)
if detection_result['mitigation_applied']:
# Log security event
await self.log_security_event('prompt_injection_detected', {
'user_input': user_input,
'detection_result': detection_result
})
# Decide whether to proceed or reject
if detection_result['risk_score'] > 0.9:
return "I cannot process this request as it appears to contain potentially harmful instructions."
# Sanitize input
sanitized_input = self.injection_detector.sanitize_input(
user_input, detection_result
)
# Process with base agent
response = await self.base_agent.process(sanitized_input, **kwargs)
# Output validation
if not self.output_validator.is_safe_output(response):
return "I cannot provide a response to this query."
return response2. Data Privacy and PII Protection
import hashlib
from typing import Dict, List
class PIIProtector:
def __init__(self):
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}-?\d{3}-?\d{4}\b',
'ssn': r'\b\d{3}-?\d{2}-?\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
}
self.anonymization_cache = {}
def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Detect PII in text"""
detected_pii = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
detected_pii[pii_type] = matches
return detected_pii
def anonymize_text(self, text: str) -> tuple[str, dict]:
"""Anonymize PII in text with reversible tokens"""
anonymized_text = text
anonymization_map = {}
detected_pii = self.detect_pii(text)
for pii_type, instances in detected_pii.items():
for instance in instances:
# Create consistent anonymization token
token_hash = hashlib.md5(instance.encode()).hexdigest()[:8]
anonymous_token = f"[{pii_type.upper()}_{token_hash}]"
# Store mapping for potential reversal (if needed)
anonymization_map[anonymous_token] = {
'original': instance,
'type': pii_type
}
# Replace in text
anonymized_text = anonymized_text.replace(instance, anonymous_token)
return anonymized_text, anonymization_map
class PrivacyCompliantAgent:
def __init__(self, base_agent: Agent):
self.base_agent = base_agent
self.pii_protector = PIIProtector()
async def process_with_privacy(self, user_input: str, **kwargs) -> str:
"""Process input while protecting PII"""
# Detect and anonymize PII
anonymized_input, anonymization_map = self.pii_protector.anonymize_text(user_input)
# Log PII detection (without storing actual PII)
if anonymization_map:
await self.log_pii_detection({
'pii_types_detected': list(set(
item['type'] for item in anonymization_map.values()
)),
'anonymization_count': len(anonymization_map)
})
# Process anonymized input
response = await self.base_agent.process(anonymized_input, **kwargs)
# Ensure response doesn't leak PII
anonymized_response, _ = self.pii_protector.anonymize_text(response)
return anonymized_responseError Handling and Graceful Degradation
from enum import Enum
import circuit_breaker
class DegradationLevel(Enum):
NORMAL = "normal"
LIMITED = "limited"
MINIMAL = "minimal"
OFFLINE = "offline"
class GracefulDegradationManager:
def __init__(self):
self.current_level = DegradationLevel.NORMAL
self.fallback_responses = {
'general': "I'm experiencing technical difficulties. Please try again later.",
'customer_service': "I'm currently unable to process your request. Please contact support at support@company.com"
}
# Circuit breakers for external dependencies
self.llm_circuit_breaker = circuit_breaker.CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=Exception
)
async def process_with_degradation(self, request_data: dict) -> str:
"""Process request with appropriate degradation level"""
try:
# Check system health and adjust degradation level
await self._assess_system_health()
if self.current_level == DegradationLevel.OFFLINE:
return self._get_offline_response(request_data)
elif self.current_level == DegradationLevel.MINIMAL:
return await self._minimal_processing(request_data)
elif self.current_level == DegradationLevel.LIMITED:
return await self._limited_processing(request_data)
else: # NORMAL
return await self._full_processing(request_data)
except Exception as e:
# Automatic degradation on errors
await self._handle_processing_error(e)
return self._get_error_response(request_data, e)
async def _full_processing(self, request_data: dict) -> str:
"""Full agent processing with all features"""
with self.llm_circuit_breaker:
return await self.primary_agent.process(request_data)
async def _limited_processing(self, request_data: dict) -> str:
"""Limited processing with reduced context/features"""
# Use smaller model or reduced context
simplified_request = self._simplify_request(request_data)
return await self.backup_agent.process(simplified_request)
async def _minimal_processing(self, request_data: dict) -> str:
"""Minimal processing using templates/rules"""
# Use rule-based responses or simple templates
intent = self._classify_intent_simple(request_data['input'])
return self._get_template_response(intent)
def _get_offline_response(self, request_data: dict) -> str:
"""Return offline message"""
request_type = request_data.get('type', 'general')
return self.fallback_responses.get(request_type, self.fallback_responses['general'])Cost Optimization and Budget Controls
class CostController:
def __init__(self):
self.daily_budget = 1000.0 # $1000/day
self.user_limits = {
'free': {'daily_requests': 10, 'daily_cost': 1.0},
'premium': {'daily_requests': 100, 'daily_cost': 10.0},
'enterprise': {'daily_requests': 1000, 'daily_cost': 100.0}
}
self.cost_tracker = CostTracker()
async def check_budget_limits(self, user_id: str, estimated_cost: float) -> dict:
"""Check if request is within budget limits"""
# Check daily system budget
daily_spent = await self.cost_tracker.get_daily_spending()
if daily_spent + estimated_cost > self.daily_budget:
return {
'allowed': False,
'reason': 'system_budget_exceeded',
'daily_spent': daily_spent,
'daily_budget': self.daily_budget
}
# Check user limits
user_tier = await self.get_user_tier(user_id)
user_limits = self.user_limits[user_tier]
user_daily_cost = await self.cost_tracker.get_user_daily_cost(user_id)
user_daily_requests = await self.cost_tracker.get_user_daily_requests(user_id)
if user_daily_cost + estimated_cost > user_limits['daily_cost']:
return {
'allowed': False,
'reason': 'user_cost_limit_exceeded',
'user_daily_cost': user_daily_cost,
'cost_limit': user_limits['daily_cost']
}
if user_daily_requests >= user_limits['daily_requests']:
return {
'allowed': False,
'reason': 'user_request_limit_exceeded',
'user_daily_requests': user_daily_requests,
'request_limit': user_limits['daily_requests']
}
return {'allowed': True}
class CostOptimizedAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.cost_controller = CostController()
self.model_router = ModelRouter()
async def process_cost_optimized(self, request_data: dict) -> str:
"""Process request with cost optimization"""
# Estimate cost for different processing options
cost_estimates = await self._estimate_processing_costs(request_data)
# Check budget limits
min_cost_option = min(cost_estimates.values())
budget_check = await self.cost_controller.check_budget_limits(
request_data['user_id'], min_cost_option
)
if not budget_check['allowed']:
return self._get_budget_exceeded_response(budget_check)
# Select optimal processing method based on cost and requirements
selected_method = self._select_processing_method(
request_data, cost_estimates, budget_check
)
# Process with selected method
result = await self._process_with_method(request_data, selected_method)
# Track actual cost
actual_cost = await self._calculate_actual_cost(result)
await self.cost_controller.record_cost(
request_data['user_id'], actual_cost
)
return resultMaintenance and Model Updates
class ModelUpdateManager:
def __init__(self):
self.current_models = {}
self.staged_models = {}
self.rollback_snapshots = {}
async def stage_model_update(self, model_name: str, new_version: str):
"""Stage new model version for testing"""
# Create snapshot of current model for rollback
self.rollback_snapshots[model_name] = {
'version': self.current_models[model_name],
'timestamp': time.time(),
'performance_baseline': await self._capture_performance_baseline(model_name)
}
# Stage new model
self.staged_models[model_name] = new_version
# Run automated validation
validation_results = await self._validate_staged_model(model_name)
return validation_results
async def promote_staged_model(self, model_name: str):
"""Promote staged model to production"""
if model_name not in self.staged_models:
raise ValueError(f"No staged model found for {model_name}")
# Gradual rollout
rollout_phases = [0.05, 0.1, 0.25, 0.5, 1.0] # 5%, 10%, 25%, 50%, 100%
for phase in rollout_phases:
await self._set_traffic_split(model_name, phase)
# Monitor for issues during rollout
await self._monitor_rollout_phase(model_name, phase, duration=3600) # 1 hour
phase_metrics = await self._get_phase_metrics(model_name, phase)
if not self._phase_passed_quality_gates(phase_metrics):
await self._rollback_model_update(model_name)
raise Exception(f"Model rollout failed at {phase*100}% phase")
# Complete promotion
self.current_models[model_name] = self.staged_models[model_name]
del self.staged_models[model_name]
async def _rollback_model_update(self, model_name: str):
"""Rollback model to previous version"""
if model_name not in self.rollback_snapshots:
raise ValueError(f"No rollback snapshot available for {model_name}")
snapshot = self.rollback_snapshots[model_name]
# Restore previous version
await self._set_traffic_split(model_name, 0.0) # Route to old model
# Log rollback event
await self._log_rollback_event(model_name, snapshot)
# Clean up staged model
if model_name in self.staged_models:
del self.staged_models[model_name]Production Deployment Checklist
Pre-Deployment
- Comprehensive test suite passing (behavioral, regression, security)
- A/B testing framework configured
- Monitoring and alerting set up
- Cost controls and budgets implemented
- Security measures in place (PII protection, injection detection)
- Error handling and graceful degradation tested
- Rollback procedures documented and tested
Deployment
- Gradual rollout plan executed
- Real-time monitoring active
- Performance metrics within acceptable ranges
- Cost tracking functioning correctly
- User feedback collection enabled
Post-Deployment
- Performance baselines established
- Regular model update procedures documented
- Incident response procedures tested
- Long-term maintenance plan established
Conclusion
Deploying AI agents to production requires balancing innovation with reliability. The strategies outlined in this series—from design patterns through observability to production practices—provide a foundation for building systems that users can trust and operations teams can maintain.
The key insight: treat AI agents not as magic boxes, but as sophisticated software systems that require the same engineering rigor as any mission-critical application. Start simple, measure everything, and evolve gradually toward the complexity your use case demands.
Remember: the goal isn’t perfect agents—it’s agents that fail gracefully, learn from mistakes, and provide consistent value to users over time.