Understanding frontier models is only half the battle. The other half is integrating them into production applications that are reliable, cost-effective, and performant. This chapter covers the essential patterns and best practices for working with OpenAI, Anthropic, and Google's APIs in production environments.
Whether you're building a chatbot, document analysis system, or code generation tool, these patterns will help you avoid common pitfalls and build robust AI applications that scale.
1. API Authentication & Security
Secure API Key Storage
Never hardcode API keys in your source code. Always use environment variables or secure secret management systems.
# ❌ NEVER DO THIS - Hardcoded API key
import openai
client = openai.OpenAI(api_key="sk-proj-abc123...") # DANGEROUS!
# ✅ DO THIS - Environment variable
import os
from openai import OpenAI
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
client = OpenAI(api_key=api_key)
Setting Environment Variables
# Linux/Mac - Add to ~/.bashrc or ~/.zshrc
export OPENAI_API_KEY="sk-proj-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GOOGLE_API_KEY="AIza..."
# Or use a .env file (recommended for development)
# Create a file named .env in your project root:
OPENAI_API_KEY=sk-proj-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=AIza...
# Then load it with python-dotenv
pip install python-dotenv
# Load environment variables from .env file
from dotenv import load_dotenv
import os
load_dotenv() # Loads .env file
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
google_key = os.getenv("GOOGLE_API_KEY")
Production Secret Management
For production deployments, use dedicated secret management services:
- AWS Secrets Manager: Integrated with AWS services, automatic rotation
- Google Cloud Secret Manager: Native GCP integration
- HashiCorp Vault: Platform-agnostic, enterprise-grade
- Azure Key Vault: For Azure deployments
# Example: AWS Secrets Manager
import boto3
import json
def get_secret(secret_name):
client = boto3.client('secretsmanager', region_name='us-east-1')
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
# Retrieve API keys securely
secrets = get_secret('prod/ai-api-keys')
openai_key = secrets['OPENAI_API_KEY']
anthropic_key = secrets['ANTHROPIC_API_KEY']
🔒 Security Checklist
- ✅ Never commit API keys to version control (add .env to .gitignore)
- ✅ Use separate keys for development and production
- ✅ Rotate keys regularly (every 90 days minimum)
- ✅ Set usage limits in provider dashboards
- ✅ Monitor usage for anomalies
- ✅ Use API key restrictions (IP allowlists, referrer restrictions)
2. Rate Limiting & Quota Management
Understanding Provider Rate Limits
📊 Rate Limit Comparison (as of 2025)
| Provider | Tier | Requests/Min | Tokens/Min |
|---|---|---|---|
| OpenAI GPT-4 | Free | 3 | 40K |
| OpenAI GPT-4 | Tier 1 | 500 | 10M |
| Anthropic Claude | Free | 5 | 20K |
| Anthropic Claude | Tier 1 | 50 | 100K |
| Google Gemini | Free | 15 | 32K |
Implementing Client-Side Rate Limiting
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter for API requests"""
def __init__(self, requests_per_minute: int, tokens_per_minute: int):
self.rpm = requests_per_minute
self.tpm = tokens_per_minute
self.request_timestamps = deque()
self.token_usage = deque()
self.lock = Lock()
def wait_if_needed(self, estimated_tokens: int = 0):
"""Block until request can be made within rate limits"""
with self.lock:
now = time.time()
# Remove timestamps older than 1 minute
cutoff = now - 60
while self.request_timestamps and self.request_timestamps[0] < cutoff:
self.request_timestamps.popleft()
while self.token_usage and self.token_usage[0][0] < cutoff:
self.token_usage.popleft()
# Check request rate
if len(self.request_timestamps) >= self.rpm:
sleep_time = 60 - (now - self.request_timestamps[0])
if sleep_time > 0:
print(f"Rate limit reached. Waiting {sleep_time:.1f}s...")
time.sleep(sleep_time)
# Check token rate
total_tokens = sum(tokens for _, tokens in self.token_usage)
if total_tokens + estimated_tokens > self.tpm:
sleep_time = 60 - (now - self.token_usage[0][0])
if sleep_time > 0:
print(f"Token limit reached. Waiting {sleep_time:.1f}s...")
time.sleep(sleep_time)
# Record this request
self.request_timestamps.append(time.time())
self.token_usage.append((time.time(), estimated_tokens))
# Usage
limiter = RateLimiter(requests_per_minute=3, tokens_per_minute=40000)
def make_api_call(prompt: str):
# Estimate tokens (rough: 1 token ≈ 4 chars)
estimated_tokens = len(prompt) // 4 + 500 # +500 for response
limiter.wait_if_needed(estimated_tokens)
# Make actual API call
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": prompt}]
)
return response
Handling 429 Rate Limit Errors
import time
import openai
from openai import OpenAI
client = OpenAI()
def api_call_with_retry(prompt: str, max_retries: int = 5):
"""Make API call with exponential backoff on rate limit errors"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": prompt}]
)
return response
except openai.RateLimitError as e:
if attempt == max_retries - 1:
raise # Re-raise on final attempt
# Exponential backoff: 2^attempt seconds (1, 2, 4, 8, 16)
wait_time = 2 ** attempt
print(f"Rate limit hit. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
except openai.APIError as e:
print(f"API error: {e}")
raise
# Usage
response = api_call_with_retry("Explain quantum computing")
3. Error Handling & Retry Strategies
Comprehensive Error Handling
import openai
from openai import OpenAI
import time
from typing import Optional
client = OpenAI()
def make_robust_api_call(
prompt: str,
model: str = "gpt-4-turbo-preview",
max_retries: int = 3,
timeout: int = 60
) -> Optional[str]:
"""
Production-ready API call with comprehensive error handling
"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=timeout
)
return response.choices[0].message.content
except openai.RateLimitError:
# Transient - retry with backoff
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"Rate limited. Retrying in {wait}s...")
time.sleep(wait)
else:
print("Rate limit persists after retries")
raise
except openai.APIConnectionError as e:
# Transient - network issue
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"Connection error: {e}. Retrying in {wait}s...")
time.sleep(wait)
else:
print("Connection failed after retries")
raise
except openai.APITimeoutError:
# Transient - service overloaded
if attempt < max_retries - 1:
print(f"Timeout. Retrying attempt {attempt + 2}...")
time.sleep(2)
else:
print("Request timed out after retries")
raise
except openai.AuthenticationError:
# Permanent - don't retry
print("Authentication failed. Check API key.")
raise
except openai.BadRequestError as e:
# Permanent - malformed request
print(f"Invalid request: {e}")
raise
except openai.InternalServerError:
# Transient - OpenAI service issue
if attempt < max_retries - 1:
print("OpenAI server error. Retrying...")
time.sleep(5)
else:
print("Server error persists")
raise
except Exception as e:
# Unexpected error - log and raise
print(f"Unexpected error: {type(e).__name__}: {e}")
raise
return None
# Usage
try:
result = make_robust_api_call("Summarize quantum computing")
if result:
print(result)
except Exception as e:
# Log to monitoring system
print(f"Failed to get AI response: {e}")
Error Handling for Claude (Anthropic)
import anthropic
from anthropic import Anthropic
client = Anthropic()
def call_claude_with_retry(prompt: str, max_retries: int = 3):
"""Claude-specific error handling"""
for attempt in range(max_retries):
try:
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
except anthropic.RateLimitError as e:
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"Claude rate limit. Waiting {wait}s...")
time.sleep(wait)
else:
raise
except anthropic.APIError as e:
# Check error code for specific handling
if e.status_code == 529: # Overloaded
if attempt < max_retries - 1:
time.sleep(10)
continue
raise
except anthropic.APIConnectionError:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
# Usage
response = call_claude_with_retry("Explain neural networks")
4. Cost Optimization Techniques
Current Pricing (2025)
💰 Cost Comparison
| Model | Input ($/1M) | Output ($/1M) | Best For |
|---|---|---|---|
| GPT-4 Turbo | $10 | $30 | Complex reasoning |
| GPT-3.5 Turbo | $0.50 | $1.50 | Simple tasks (20x cheaper!) |
| Claude 3.5 Sonnet | $3 | $15 | Long documents, coding |
| Claude 3 Haiku | $0.25 | $1.25 | Fast, lightweight tasks |
| Gemini 1.5 Pro | $3.50 | $10.50 | Video, massive context |
1. Intelligent Model Selection
def choose_model(task_complexity: str, context_length: int) -> str:
"""
Select the most cost-effective model for the task
"""
if task_complexity == "simple" and context_length < 4000:
# 20x cheaper than GPT-4
return "gpt-3.5-turbo"
elif task_complexity == "moderate" and context_length < 8000:
# Best price/performance for general tasks
return "claude-3-5-sonnet-20241022"
elif context_length > 100000:
# Only model that can handle this efficiently
return "claude-3-5-sonnet-20241022" # 200K context
elif "video" in task_complexity or "audio" in task_complexity:
# Native multimodal
return "gemini-1.5-pro"
else:
# Complex reasoning
return "gpt-4-turbo-preview"
# Example usage
model = choose_model("simple", 500)
print(f"Using {model} for cost efficiency")
2. Prompt Optimization
# ❌ BAD - Verbose prompt (expensive)
verbose_prompt = """
I would like you to please analyze this document for me.
Could you take your time and really think about it carefully?
I need you to extract all the important information from it.
Please be thorough and don't miss anything important.
Also, please format your response nicely with bullet points.
Thank you so much for your help with this task!
Document: [10,000 word document]
"""
# ✅ GOOD - Concise prompt (cheaper)
concise_prompt = """
Analyze and extract key information from this document:
Document: [10,000 word document]
Format: Bullet points
"""
# Savings: ~100 tokens = $0.001 per request
# At 10M requests/year: $10,000 saved!
3. Response Length Control
import openai
from openai import OpenAI
client = OpenAI()
# Limit output tokens to control costs
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": "Explain AI in detail"}],
max_tokens=500, # Prevents overly long responses
temperature=0.7
)
# Calculate cost
input_tokens = 10 # Rough estimate
output_tokens = len(response.choices[0].message.content) // 4
cost = (input_tokens * 10 + output_tokens * 30) / 1_000_000
print(f"Estimated cost: ${cost:.6f}")
4. Caching Responses
import hashlib
import json
from typing import Optional
class ResponseCache:
"""Simple in-memory cache for API responses"""
def __init__(self):
self.cache = {}
def get_cache_key(self, prompt: str, model: str) -> str:
"""Generate cache key from prompt and model"""
content = f"{model}:{prompt}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, prompt: str, model: str) -> Optional[str]:
"""Retrieve cached response"""
key = self.get_cache_key(prompt, model)
return self.cache.get(key)
def set(self, prompt: str, model: str, response: str):
"""Store response in cache"""
key = self.get_cache_key(prompt, model)
self.cache[key] = response
# Usage
cache = ResponseCache()
def cached_api_call(prompt: str, model: str = "gpt-4-turbo-preview"):
# Check cache first
cached = cache.get(prompt, model)
if cached:
print("✓ Cache hit - $0.00 cost")
return cached
# Make API call
print("✗ Cache miss - making API call")
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
cache.set(prompt, model, result)
return result
# First call: API request
response1 = cached_api_call("What is AI?")
# Second call: Cached (free!)
response2 = cached_api_call("What is AI?")
5. Cost Tracking
import time
from dataclasses import dataclass
from typing import List
@dataclass
class APICall:
timestamp: float
model: str
input_tokens: int
output_tokens: int
cost: float
class CostTracker:
"""Track API usage and costs"""
# Pricing per 1M tokens
PRICING = {
"gpt-4-turbo-preview": {"input": 10, "output": 30},
"gpt-3.5-turbo": {"input": 0.5, "output": 1.5},
"claude-3-5-sonnet-20241022": {"input": 3, "output": 15},
}
def __init__(self):
self.calls: List[APICall] = []
def log_call(self, model: str, input_tokens: int, output_tokens: int):
"""Log an API call and calculate cost"""
pricing = self.PRICING.get(model, {"input": 10, "output": 30})
cost = (
(input_tokens * pricing["input"] / 1_000_000) +
(output_tokens * pricing["output"] / 1_000_000)
)
call = APICall(
timestamp=time.time(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost
)
self.calls.append(call)
return cost
def get_total_cost(self) -> float:
"""Get total cost across all calls"""
return sum(call.cost for call in self.calls)
def get_cost_by_model(self) -> dict:
"""Get cost breakdown by model"""
breakdown = {}
for call in self.calls:
if call.model not in breakdown:
breakdown[call.model] = 0
breakdown[call.model] += call.cost
return breakdown
# Usage
tracker = CostTracker()
# Make API call and track cost
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": "Hello"}]
)
cost = tracker.log_call(
model="gpt-4-turbo-preview",
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
print(f"Call cost: ${cost:.6f}")
print(f"Total cost today: ${tracker.get_total_cost():.6f}")
5. Monitoring & Logging
Comprehensive Logging
import logging
import time
import json
from typing import Dict, Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ai_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def log_api_call(
model: str,
prompt: str,
response: str,
latency: float,
tokens: Dict[str, int],
cost: float,
error: str = None
):
"""Log comprehensive API call information"""
log_data = {
"timestamp": time.time(),
"model": model,
"prompt_length": len(prompt),
"response_length": len(response) if response else 0,
"latency_ms": round(latency * 1000, 2),
"input_tokens": tokens.get("input", 0),
"output_tokens": tokens.get("output", 0),
"cost_usd": round(cost, 6),
"error": error
}
if error:
logger.error(f"API call failed: {json.dumps(log_data)}")
else:
logger.info(f"API call succeeded: {json.dumps(log_data)}")
def monitored_api_call(prompt: str, model: str = "gpt-4-turbo-preview"):
"""Make API call with comprehensive monitoring"""
start_time = time.time()
error = None
response = None
tokens = {"input": 0, "output": 0}
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
tokens = {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
result = response.choices[0].message.content
except Exception as e:
error = str(e)
result = None
finally:
latency = time.time() - start_time
# Calculate cost
pricing = {"gpt-4-turbo-preview": {"input": 10, "output": 30}}
cost = (
(tokens["input"] * pricing[model]["input"] / 1_000_000) +
(tokens["output"] * pricing[model]["output"] / 1_000_000)
)
# Log everything
log_api_call(
model=model,
prompt=prompt,
response=result or "",
latency=latency,
tokens=tokens,
cost=cost,
error=error
)
if error:
raise Exception(error)
return result
# Usage
response = monitored_api_call("Explain machine learning")
Performance Monitoring
from collections import defaultdict
from typing import List
import statistics
class PerformanceMonitor:
"""Monitor API performance metrics"""
def __init__(self):
self.latencies: List[float] = []
self.errors_by_type = defaultdict(int)
self.calls_by_model = defaultdict(int)
def record_latency(self, latency: float):
"""Record API call latency"""
self.latencies.append(latency)
def record_error(self, error_type: str):
"""Record error occurrence"""
self.errors_by_type[error_type] += 1
def record_call(self, model: str):
"""Record API call by model"""
self.calls_by_model[model] += 1
def get_stats(self) -> dict:
"""Get performance statistics"""
if not self.latencies:
return {"message": "No data yet"}
return {
"total_calls": len(self.latencies),
"avg_latency_ms": round(statistics.mean(self.latencies) * 1000, 2),
"p50_latency_ms": round(statistics.median(self.latencies) * 1000, 2),
"p95_latency_ms": round(
sorted(self.latencies)[int(len(self.latencies) * 0.95)] * 1000, 2
),
"error_rate": sum(self.errors_by_type.values()) / len(self.latencies),
"errors_by_type": dict(self.errors_by_type),
"calls_by_model": dict(self.calls_by_model)
}
# Usage
monitor = PerformanceMonitor()
start = time.time()
try:
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": "Hello"}]
)
monitor.record_latency(time.time() - start)
monitor.record_call("gpt-4-turbo-preview")
except Exception as e:
monitor.record_error(type(e).__name__)
print(monitor.get_stats())
6. Production Deployment Patterns
Pattern 1: Asynchronous Processing
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def process_batch(prompts: List[str]) -> List[str]:
"""Process multiple prompts concurrently"""
async def process_one(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Process all prompts concurrently
tasks = [process_one(prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
# Usage
prompts = [
"Summarize article 1",
"Summarize article 2",
"Summarize article 3"
]
results = asyncio.run(process_batch(prompts))
# Much faster than sequential processing!
Pattern 2: Queue-Based Processing
from queue import Queue
from threading import Thread
import time
class APIWorker:
"""Background worker for processing API requests"""
def __init__(self, num_workers: int = 3):
self.queue = Queue()
self.workers = []
self.running = True
for _ in range(num_workers):
worker = Thread(target=self._worker_loop)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker_loop(self):
"""Worker thread loop"""
while self.running:
try:
task = self.queue.get(timeout=1)
if task is None:
break
prompt, callback = task
# Process request
try:
response = client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
callback(result, None)
except Exception as e:
callback(None, str(e))
self.queue.task_done()
except:
continue
def submit(self, prompt: str, callback):
"""Submit a request to the queue"""
self.queue.put((prompt, callback))
def shutdown(self):
"""Shutdown all workers"""
self.running = False
for _ in self.workers:
self.queue.put(None)
# Usage
worker = APIWorker(num_workers=3)
def on_complete(result, error):
if error:
print(f"Error: {error}")
else:
print(f"Result: {result[:100]}...")
worker.submit("Explain AI", on_complete)
worker.submit("Explain ML", on_complete)
worker.submit("Explain DL", on_complete)
# Wait for completion
time.sleep(10)
worker.shutdown()
Pattern 3: Circuit Breaker
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Circuit breaker pattern for API resilience"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("Circuit breaker entering HALF_OPEN state")
else:
raise Exception("Circuit breaker is OPEN - API is failing")
try:
result = func(*args, **kwargs)
# Success - reset if in HALF_OPEN
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("Circuit breaker CLOSED - API recovered")
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN after {self.failure_count} failures")
raise
# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def api_call():
return client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[{"role": "user", "content": "Hello"}]
)
try:
response = breaker.call(api_call)
except Exception as e:
print(f"Call failed: {e}")
✅ Key Takeaways
- Never hardcode API keys - use environment variables or secret managers
- Implement client-side rate limiting to avoid 429 errors
- Use exponential backoff for transient errors, but don't retry permanent errors
- Choose the cheapest model that meets your requirements (GPT-3.5 is 20x cheaper than GPT-4)
- Cache responses when appropriate to eliminate API costs
- Track costs, latencies, and errors in production
- Use async processing for better throughput
- Implement circuit breakers for resilience