MODULE 1 - CHAPTER 5 ⏱️ 40 min read 📖 3,000 words

API Integration & Best Practices

Production-ready patterns for building reliable, cost-effective AI applications

Understanding frontier models is only half the battle. The other half is integrating them into production applications that are reliable, cost-effective, and performant. This chapter covers the essential patterns and best practices for working with OpenAI, Anthropic, and Google's APIs in production environments.

Whether you're building a chatbot, document analysis system, or code generation tool, these patterns will help you avoid common pitfalls and build robust AI applications that scale.

1. API Authentication & Security

API Key Management
API keys are the primary authentication mechanism for LLM APIs. They're essentially long, randomly generated strings that prove your identity to the API provider. Proper key management is critical for security and cost control.
Why it matters: A leaked API key can result in unauthorized usage charges (potentially thousands of dollars), data breaches, and service disruption. Many developers have learned this the hard way.

Secure API Key Storage

Never hardcode API keys in your source code. Always use environment variables or secure secret management systems.

# ❌ NEVER DO THIS - Hardcoded API key
import openai
client = openai.OpenAI(api_key="sk-proj-abc123...")  # DANGEROUS!

# ✅ DO THIS - Environment variable
import os
from openai import OpenAI

api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
    raise ValueError("OPENAI_API_KEY environment variable not set")

client = OpenAI(api_key=api_key)

Setting Environment Variables

# Linux/Mac - Add to ~/.bashrc or ~/.zshrc
export OPENAI_API_KEY="sk-proj-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GOOGLE_API_KEY="AIza..."

# Or use a .env file (recommended for development)
# Create a file named .env in your project root:
OPENAI_API_KEY=sk-proj-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=AIza...

# Then load it with python-dotenv
pip install python-dotenv
# Load environment variables from .env file
from dotenv import load_dotenv
import os

load_dotenv()  # Loads .env file

openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
google_key = os.getenv("GOOGLE_API_KEY")

Production Secret Management

For production deployments, use dedicated secret management services:

  • AWS Secrets Manager: Integrated with AWS services, automatic rotation
  • Google Cloud Secret Manager: Native GCP integration
  • HashiCorp Vault: Platform-agnostic, enterprise-grade
  • Azure Key Vault: For Azure deployments
# Example: AWS Secrets Manager
import boto3
import json

def get_secret(secret_name):
    client = boto3.client('secretsmanager', region_name='us-east-1')
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response['SecretString'])

# Retrieve API keys securely
secrets = get_secret('prod/ai-api-keys')
openai_key = secrets['OPENAI_API_KEY']
anthropic_key = secrets['ANTHROPIC_API_KEY']

🔒 Security Checklist

  • ✅ Never commit API keys to version control (add .env to .gitignore)
  • ✅ Use separate keys for development and production
  • ✅ Rotate keys regularly (every 90 days minimum)
  • ✅ Set usage limits in provider dashboards
  • ✅ Monitor usage for anomalies
  • ✅ Use API key restrictions (IP allowlists, referrer restrictions)

2. Rate Limiting & Quota Management

Rate Limits
Rate limits control how many requests you can make to an API within a given time period. LLM providers typically set limits on requests per minute (RPM) and tokens per minute (TPM) to ensure fair usage and system stability.
Example: OpenAI's free tier allows 3 RPM and 40,000 TPM for GPT-4. If you exceed this, you'll receive a 429 "Too Many Requests" error.

Understanding Provider Rate Limits

📊 Rate Limit Comparison (as of 2025)

Provider Tier Requests/Min Tokens/Min
OpenAI GPT-4 Free 3 40K
OpenAI GPT-4 Tier 1 500 10M
Anthropic Claude Free 5 20K
Anthropic Claude Tier 1 50 100K
Google Gemini Free 15 32K

Implementing Client-Side Rate Limiting

import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter for API requests"""

    def __init__(self, requests_per_minute: int, tokens_per_minute: int):
        self.rpm = requests_per_minute
        self.tpm = tokens_per_minute
        self.request_timestamps = deque()
        self.token_usage = deque()
        self.lock = Lock()

    def wait_if_needed(self, estimated_tokens: int = 0):
        """Block until request can be made within rate limits"""
        with self.lock:
            now = time.time()

            # Remove timestamps older than 1 minute
            cutoff = now - 60
            while self.request_timestamps and self.request_timestamps[0] < cutoff:
                self.request_timestamps.popleft()
            while self.token_usage and self.token_usage[0][0] < cutoff:
                self.token_usage.popleft()

            # Check request rate
            if len(self.request_timestamps) >= self.rpm:
                sleep_time = 60 - (now - self.request_timestamps[0])
                if sleep_time > 0:
                    print(f"Rate limit reached. Waiting {sleep_time:.1f}s...")
                    time.sleep(sleep_time)

            # Check token rate
            total_tokens = sum(tokens for _, tokens in self.token_usage)
            if total_tokens + estimated_tokens > self.tpm:
                sleep_time = 60 - (now - self.token_usage[0][0])
                if sleep_time > 0:
                    print(f"Token limit reached. Waiting {sleep_time:.1f}s...")
                    time.sleep(sleep_time)

            # Record this request
            self.request_timestamps.append(time.time())
            self.token_usage.append((time.time(), estimated_tokens))

# Usage
limiter = RateLimiter(requests_per_minute=3, tokens_per_minute=40000)

def make_api_call(prompt: str):
    # Estimate tokens (rough: 1 token ≈ 4 chars)
    estimated_tokens = len(prompt) // 4 + 500  # +500 for response

    limiter.wait_if_needed(estimated_tokens)

    # Make actual API call
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{"role": "user", "content": prompt}]
    )
    return response

Handling 429 Rate Limit Errors

import time
import openai
from openai import OpenAI

client = OpenAI()

def api_call_with_retry(prompt: str, max_retries: int = 5):
    """Make API call with exponential backoff on rate limit errors"""

    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-4-turbo-preview",
                messages=[{"role": "user", "content": prompt}]
            )
            return response

        except openai.RateLimitError as e:
            if attempt == max_retries - 1:
                raise  # Re-raise on final attempt

            # Exponential backoff: 2^attempt seconds (1, 2, 4, 8, 16)
            wait_time = 2 ** attempt
            print(f"Rate limit hit. Waiting {wait_time}s before retry...")
            time.sleep(wait_time)

        except openai.APIError as e:
            print(f"API error: {e}")
            raise

# Usage
response = api_call_with_retry("Explain quantum computing")

3. Error Handling & Retry Strategies

Transient vs. Permanent Errors
Transient errors are temporary issues (network glitches, service overload) that may succeed if retried. Permanent errors (invalid API key, malformed request) will never succeed and should not be retried.
Why it matters: Retrying permanent errors wastes time and resources. Not retrying transient errors leads to unnecessary failures in production.

Comprehensive Error Handling

import openai
from openai import OpenAI
import time
from typing import Optional

client = OpenAI()

def make_robust_api_call(
    prompt: str,
    model: str = "gpt-4-turbo-preview",
    max_retries: int = 3,
    timeout: int = 60
) -> Optional[str]:
    """
    Production-ready API call with comprehensive error handling
    """

    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                timeout=timeout
            )
            return response.choices[0].message.content

        except openai.RateLimitError:
            # Transient - retry with backoff
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"Rate limited. Retrying in {wait}s...")
                time.sleep(wait)
            else:
                print("Rate limit persists after retries")
                raise

        except openai.APIConnectionError as e:
            # Transient - network issue
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"Connection error: {e}. Retrying in {wait}s...")
                time.sleep(wait)
            else:
                print("Connection failed after retries")
                raise

        except openai.APITimeoutError:
            # Transient - service overloaded
            if attempt < max_retries - 1:
                print(f"Timeout. Retrying attempt {attempt + 2}...")
                time.sleep(2)
            else:
                print("Request timed out after retries")
                raise

        except openai.AuthenticationError:
            # Permanent - don't retry
            print("Authentication failed. Check API key.")
            raise

        except openai.BadRequestError as e:
            # Permanent - malformed request
            print(f"Invalid request: {e}")
            raise

        except openai.InternalServerError:
            # Transient - OpenAI service issue
            if attempt < max_retries - 1:
                print("OpenAI server error. Retrying...")
                time.sleep(5)
            else:
                print("Server error persists")
                raise

        except Exception as e:
            # Unexpected error - log and raise
            print(f"Unexpected error: {type(e).__name__}: {e}")
            raise

    return None

# Usage
try:
    result = make_robust_api_call("Summarize quantum computing")
    if result:
        print(result)
except Exception as e:
    # Log to monitoring system
    print(f"Failed to get AI response: {e}")

Error Handling for Claude (Anthropic)

import anthropic
from anthropic import Anthropic

client = Anthropic()

def call_claude_with_retry(prompt: str, max_retries: int = 3):
    """Claude-specific error handling"""

    for attempt in range(max_retries):
        try:
            message = client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=4096,
                messages=[{"role": "user", "content": prompt}]
            )
            return message.content[0].text

        except anthropic.RateLimitError as e:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"Claude rate limit. Waiting {wait}s...")
                time.sleep(wait)
            else:
                raise

        except anthropic.APIError as e:
            # Check error code for specific handling
            if e.status_code == 529:  # Overloaded
                if attempt < max_retries - 1:
                    time.sleep(10)
                    continue
            raise

        except anthropic.APIConnectionError:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

# Usage
response = call_claude_with_retry("Explain neural networks")

4. Cost Optimization Techniques

Token-Based Pricing
All major LLM providers charge based on tokens processed. Input tokens (your prompt) and output tokens (the model's response) are typically priced differently, with output tokens costing 2-3x more.
Example: GPT-4 Turbo costs $10/1M input tokens and $30/1M output tokens. A 1,000-token prompt with a 500-token response costs: (1000 × $10 + 500 × $30) / 1M = $0.025

Current Pricing (2025)

💰 Cost Comparison

Model Input ($/1M) Output ($/1M) Best For
GPT-4 Turbo $10 $30 Complex reasoning
GPT-3.5 Turbo $0.50 $1.50 Simple tasks (20x cheaper!)
Claude 3.5 Sonnet $3 $15 Long documents, coding
Claude 3 Haiku $0.25 $1.25 Fast, lightweight tasks
Gemini 1.5 Pro $3.50 $10.50 Video, massive context

1. Intelligent Model Selection

def choose_model(task_complexity: str, context_length: int) -> str:
    """
    Select the most cost-effective model for the task
    """
    if task_complexity == "simple" and context_length < 4000:
        # 20x cheaper than GPT-4
        return "gpt-3.5-turbo"

    elif task_complexity == "moderate" and context_length < 8000:
        # Best price/performance for general tasks
        return "claude-3-5-sonnet-20241022"

    elif context_length > 100000:
        # Only model that can handle this efficiently
        return "claude-3-5-sonnet-20241022"  # 200K context

    elif "video" in task_complexity or "audio" in task_complexity:
        # Native multimodal
        return "gemini-1.5-pro"

    else:
        # Complex reasoning
        return "gpt-4-turbo-preview"

# Example usage
model = choose_model("simple", 500)
print(f"Using {model} for cost efficiency")

2. Prompt Optimization

# ❌ BAD - Verbose prompt (expensive)
verbose_prompt = """
I would like you to please analyze this document for me.
Could you take your time and really think about it carefully?
I need you to extract all the important information from it.
Please be thorough and don't miss anything important.
Also, please format your response nicely with bullet points.
Thank you so much for your help with this task!

Document: [10,000 word document]
"""

# ✅ GOOD - Concise prompt (cheaper)
concise_prompt = """
Analyze and extract key information from this document:

Document: [10,000 word document]

Format: Bullet points
"""

# Savings: ~100 tokens = $0.001 per request
# At 10M requests/year: $10,000 saved!

3. Response Length Control

import openai
from openai import OpenAI

client = OpenAI()

# Limit output tokens to control costs
response = client.chat.completions.create(
    model="gpt-4-turbo-preview",
    messages=[{"role": "user", "content": "Explain AI in detail"}],
    max_tokens=500,  # Prevents overly long responses
    temperature=0.7
)

# Calculate cost
input_tokens = 10  # Rough estimate
output_tokens = len(response.choices[0].message.content) // 4

cost = (input_tokens * 10 + output_tokens * 30) / 1_000_000
print(f"Estimated cost: ${cost:.6f}")

4. Caching Responses

import hashlib
import json
from typing import Optional

class ResponseCache:
    """Simple in-memory cache for API responses"""

    def __init__(self):
        self.cache = {}

    def get_cache_key(self, prompt: str, model: str) -> str:
        """Generate cache key from prompt and model"""
        content = f"{model}:{prompt}"
        return hashlib.md5(content.encode()).hexdigest()

    def get(self, prompt: str, model: str) -> Optional[str]:
        """Retrieve cached response"""
        key = self.get_cache_key(prompt, model)
        return self.cache.get(key)

    def set(self, prompt: str, model: str, response: str):
        """Store response in cache"""
        key = self.get_cache_key(prompt, model)
        self.cache[key] = response

# Usage
cache = ResponseCache()

def cached_api_call(prompt: str, model: str = "gpt-4-turbo-preview"):
    # Check cache first
    cached = cache.get(prompt, model)
    if cached:
        print("✓ Cache hit - $0.00 cost")
        return cached

    # Make API call
    print("✗ Cache miss - making API call")
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )

    result = response.choices[0].message.content
    cache.set(prompt, model, result)
    return result

# First call: API request
response1 = cached_api_call("What is AI?")

# Second call: Cached (free!)
response2 = cached_api_call("What is AI?")

5. Cost Tracking

import time
from dataclasses import dataclass
from typing import List

@dataclass
class APICall:
    timestamp: float
    model: str
    input_tokens: int
    output_tokens: int
    cost: float

class CostTracker:
    """Track API usage and costs"""

    # Pricing per 1M tokens
    PRICING = {
        "gpt-4-turbo-preview": {"input": 10, "output": 30},
        "gpt-3.5-turbo": {"input": 0.5, "output": 1.5},
        "claude-3-5-sonnet-20241022": {"input": 3, "output": 15},
    }

    def __init__(self):
        self.calls: List[APICall] = []

    def log_call(self, model: str, input_tokens: int, output_tokens: int):
        """Log an API call and calculate cost"""
        pricing = self.PRICING.get(model, {"input": 10, "output": 30})

        cost = (
            (input_tokens * pricing["input"] / 1_000_000) +
            (output_tokens * pricing["output"] / 1_000_000)
        )

        call = APICall(
            timestamp=time.time(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost
        )
        self.calls.append(call)
        return cost

    def get_total_cost(self) -> float:
        """Get total cost across all calls"""
        return sum(call.cost for call in self.calls)

    def get_cost_by_model(self) -> dict:
        """Get cost breakdown by model"""
        breakdown = {}
        for call in self.calls:
            if call.model not in breakdown:
                breakdown[call.model] = 0
            breakdown[call.model] += call.cost
        return breakdown

# Usage
tracker = CostTracker()

# Make API call and track cost
response = client.chat.completions.create(
    model="gpt-4-turbo-preview",
    messages=[{"role": "user", "content": "Hello"}]
)

cost = tracker.log_call(
    model="gpt-4-turbo-preview",
    input_tokens=response.usage.prompt_tokens,
    output_tokens=response.usage.completion_tokens
)

print(f"Call cost: ${cost:.6f}")
print(f"Total cost today: ${tracker.get_total_cost():.6f}")

5. Monitoring & Logging

Observability
Observability is the ability to understand your application's internal state from external outputs. For AI applications, this means logging prompts, responses, latencies, costs, and errors to detect issues and optimize performance.

Comprehensive Logging

import logging
import time
import json
from typing import Dict, Any

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('ai_api.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def log_api_call(
    model: str,
    prompt: str,
    response: str,
    latency: float,
    tokens: Dict[str, int],
    cost: float,
    error: str = None
):
    """Log comprehensive API call information"""

    log_data = {
        "timestamp": time.time(),
        "model": model,
        "prompt_length": len(prompt),
        "response_length": len(response) if response else 0,
        "latency_ms": round(latency * 1000, 2),
        "input_tokens": tokens.get("input", 0),
        "output_tokens": tokens.get("output", 0),
        "cost_usd": round(cost, 6),
        "error": error
    }

    if error:
        logger.error(f"API call failed: {json.dumps(log_data)}")
    else:
        logger.info(f"API call succeeded: {json.dumps(log_data)}")

def monitored_api_call(prompt: str, model: str = "gpt-4-turbo-preview"):
    """Make API call with comprehensive monitoring"""

    start_time = time.time()
    error = None
    response = None
    tokens = {"input": 0, "output": 0}

    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

        tokens = {
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        }

        result = response.choices[0].message.content

    except Exception as e:
        error = str(e)
        result = None

    finally:
        latency = time.time() - start_time

        # Calculate cost
        pricing = {"gpt-4-turbo-preview": {"input": 10, "output": 30}}
        cost = (
            (tokens["input"] * pricing[model]["input"] / 1_000_000) +
            (tokens["output"] * pricing[model]["output"] / 1_000_000)
        )

        # Log everything
        log_api_call(
            model=model,
            prompt=prompt,
            response=result or "",
            latency=latency,
            tokens=tokens,
            cost=cost,
            error=error
        )

    if error:
        raise Exception(error)

    return result

# Usage
response = monitored_api_call("Explain machine learning")

Performance Monitoring

from collections import defaultdict
from typing import List
import statistics

class PerformanceMonitor:
    """Monitor API performance metrics"""

    def __init__(self):
        self.latencies: List[float] = []
        self.errors_by_type = defaultdict(int)
        self.calls_by_model = defaultdict(int)

    def record_latency(self, latency: float):
        """Record API call latency"""
        self.latencies.append(latency)

    def record_error(self, error_type: str):
        """Record error occurrence"""
        self.errors_by_type[error_type] += 1

    def record_call(self, model: str):
        """Record API call by model"""
        self.calls_by_model[model] += 1

    def get_stats(self) -> dict:
        """Get performance statistics"""
        if not self.latencies:
            return {"message": "No data yet"}

        return {
            "total_calls": len(self.latencies),
            "avg_latency_ms": round(statistics.mean(self.latencies) * 1000, 2),
            "p50_latency_ms": round(statistics.median(self.latencies) * 1000, 2),
            "p95_latency_ms": round(
                sorted(self.latencies)[int(len(self.latencies) * 0.95)] * 1000, 2
            ),
            "error_rate": sum(self.errors_by_type.values()) / len(self.latencies),
            "errors_by_type": dict(self.errors_by_type),
            "calls_by_model": dict(self.calls_by_model)
        }

# Usage
monitor = PerformanceMonitor()

start = time.time()
try:
    response = client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{"role": "user", "content": "Hello"}]
    )
    monitor.record_latency(time.time() - start)
    monitor.record_call("gpt-4-turbo-preview")
except Exception as e:
    monitor.record_error(type(e).__name__)

print(monitor.get_stats())

6. Production Deployment Patterns

Pattern 1: Asynchronous Processing

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def process_batch(prompts: List[str]) -> List[str]:
    """Process multiple prompts concurrently"""

    async def process_one(prompt: str) -> str:
        response = await client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

    # Process all prompts concurrently
    tasks = [process_one(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    return results

# Usage
prompts = [
    "Summarize article 1",
    "Summarize article 2",
    "Summarize article 3"
]

results = asyncio.run(process_batch(prompts))
# Much faster than sequential processing!

Pattern 2: Queue-Based Processing

from queue import Queue
from threading import Thread
import time

class APIWorker:
    """Background worker for processing API requests"""

    def __init__(self, num_workers: int = 3):
        self.queue = Queue()
        self.workers = []
        self.running = True

        for _ in range(num_workers):
            worker = Thread(target=self._worker_loop)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)

    def _worker_loop(self):
        """Worker thread loop"""
        while self.running:
            try:
                task = self.queue.get(timeout=1)
                if task is None:
                    break

                prompt, callback = task

                # Process request
                try:
                    response = client.chat.completions.create(
                        model="gpt-4-turbo-preview",
                        messages=[{"role": "user", "content": prompt}]
                    )
                    result = response.choices[0].message.content
                    callback(result, None)
                except Exception as e:
                    callback(None, str(e))

                self.queue.task_done()
            except:
                continue

    def submit(self, prompt: str, callback):
        """Submit a request to the queue"""
        self.queue.put((prompt, callback))

    def shutdown(self):
        """Shutdown all workers"""
        self.running = False
        for _ in self.workers:
            self.queue.put(None)

# Usage
worker = APIWorker(num_workers=3)

def on_complete(result, error):
    if error:
        print(f"Error: {error}")
    else:
        print(f"Result: {result[:100]}...")

worker.submit("Explain AI", on_complete)
worker.submit("Explain ML", on_complete)
worker.submit("Explain DL", on_complete)

# Wait for completion
time.sleep(10)
worker.shutdown()

Pattern 3: Circuit Breaker

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker pattern for API resilience"""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""

        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker entering HALF_OPEN state")
            else:
                raise Exception("Circuit breaker is OPEN - API is failing")

        try:
            result = func(*args, **kwargs)

            # Success - reset if in HALF_OPEN
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                print("Circuit breaker CLOSED - API recovered")

            return result

        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit breaker OPEN after {self.failure_count} failures")

            raise

# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def api_call():
    return client.chat.completions.create(
        model="gpt-4-turbo-preview",
        messages=[{"role": "user", "content": "Hello"}]
    )

try:
    response = breaker.call(api_call)
except Exception as e:
    print(f"Call failed: {e}")

✅ Key Takeaways

  • Never hardcode API keys - use environment variables or secret managers
  • Implement client-side rate limiting to avoid 429 errors
  • Use exponential backoff for transient errors, but don't retry permanent errors
  • Choose the cheapest model that meets your requirements (GPT-3.5 is 20x cheaper than GPT-4)
  • Cache responses when appropriate to eliminate API costs
  • Track costs, latencies, and errors in production
  • Use async processing for better throughput
  • Implement circuit breakers for resilience

📚 Further Reading