Lab Overview
In this advanced lab, you'll build a complete multi-agent system that showcases the power of collaborative AI. You'll start with simple ReAct agents, progress through LangGraph state machines, and culminate in a specialized team of agents working together with memory and production-grade APIs.
Final System Architecture
Each agent has specialized tools, persistent memory, and the ability to collaborate through a shared state machine
What You'll Build
- Exercise 1: Simple ReAct agent with tool use (search, calculator)
- Exercise 2: LangGraph state machine with complex workflows
- Exercise 3: Multi-agent system using CrewAI (researcher, writer, critic)
- Exercise 4: Agent memory system with vector storage
- Exercise 5: Production FastAPI service with streaming
- Python 3.10+ installed
- OpenAI API key ($5-10 budget for this lab)
- SerpAPI key (free tier available)
- Basic understanding of async Python
- Terminal/command line proficiency
- Start with smaller test queries before running expensive multi-agent workflows
- Monitor your API costs throughout the lab
- Read all instructions before starting each exercise
- Save your work frequently (agents can be unpredictable!)
- Review the expected outputs before testing
Build a Simple ReAct Agent
Learn the fundamentals of agentic AI by implementing a ReAct (Reasoning + Acting) agent with LangChain. Your agent will use external tools (web search and calculator) to answer questions it couldn't solve alone.
Objectives
Step 1: Environment Setup
First, install the required packages and set up your API keys:
# Install dependencies
pip install langchain langchain-openai python-dotenv google-search-results
# Create .env file
cat > .env << 'EOF'
OPENAI_API_KEY=your_openai_api_key_here
SERPAPI_API_KEY=your_serpapi_key_here
EOF
Step 2: Create the ReAct Agent
Create exercise1_react_agent.py with the following code:
"""
Exercise 1: Simple ReAct Agent
Implements Reasoning + Acting pattern with tools
"""
import os
from dotenv import load_dotenv
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain_community.utilities import SerpAPIWrapper
from langchain import hub
# Load environment variables
load_dotenv()
def create_calculator_tool():
"""Simple calculator tool for mathematical operations"""
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression. Input should be a valid Python expression."""
try:
# Safe evaluation of mathematical expressions
result = eval(expression, {"__builtins__": {}}, {})
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
return Tool(
name="Calculator",
func=calculate,
description="Useful for mathematical calculations. Input should be a valid Python expression like '2 + 2' or '(15 * 7) / 3'"
)
def create_search_tool():
"""Web search tool using SerpAPI"""
search = SerpAPIWrapper()
return Tool(
name="Search",
func=search.run,
description="Useful for searching the internet for current information. Input should be a search query."
)
def create_react_agent_executor():
"""Create and configure the ReAct agent"""
# Initialize LLM
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
verbose=True
)
# Create tools
tools = [
create_search_tool(),
create_calculator_tool()
]
# Get the ReAct prompt template from LangChain hub
prompt = hub.pull("hwchase17/react")
# Create the agent
agent = create_react_agent(llm, tools, prompt)
# Create executor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=5
)
return agent_executor
def main():
"""Test the ReAct agent with sample queries"""
print("=" * 80)
print("EXERCISE 1: ReAct Agent with Tools")
print("=" * 80)
agent = create_react_agent_executor()
# Test queries that require different tools
test_queries = [
"What is the current population of Tokyo?",
"Calculate the result of (1234 + 5678) * 3",
"Who won the 2023 FIFA Women's World Cup and what was the final score?"
]
for i, query in enumerate(test_queries, 1):
print(f"\n{'=' * 80}")
print(f"Query {i}: {query}")
print(f"{'=' * 80}\n")
try:
result = agent.invoke({"input": query})
print(f"\nā Final Answer: {result['output']}")
except Exception as e:
print(f"\nā Error: {str(e)}")
print("\n" + "=" * 80)
if __name__ == "__main__":
main()
Step 3: Run and Observe
Execute the agent and watch the ReAct loop in action:
python exercise1_react_agent.py
Thought: I need to search for the current population of Tokyo
Action: Search
Action Input: "Tokyo population 2024"
Observation: Tokyo has a population of approximately 14 million...
Thought: I now know the final answer
Final Answer: The current population of Tokyo is approximately 14 million people...
The agent is following the ReAct pattern:
- Thought: Agent reasons about what to do
- Action: Agent selects a tool to use
- Action Input: Agent provides input to the tool
- Observation: Agent receives tool output
- Repeat or Answer: Loop continues until agent has final answer
š Bonus Challenge
Add a custom "Wikipedia" tool that fetches article summaries. Test it with queries like "What is quantum computing according to Wikipedia?"
Deliverable
ā A working ReAct agent that can:
- Search the web for current information
- Perform mathematical calculations
- Chain multiple tool calls together
- Provide reasoning for its actions
Build a LangGraph State Machine
Move beyond linear agent execution to create complex workflows with conditional logic, loops, and state management using LangGraph.
Objectives
Step 1: Install LangGraph
pip install langgraph
Step 2: Design the Workflow
Research Workflow Graph
If quality check fails, loop back to Research node
Step 3: Implement the State Machine
Create exercise2_langgraph.py:
"""
Exercise 2: LangGraph State Machine
Implements a complex research workflow with conditional logic
"""
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage
# Define the state schema
class ResearchState(TypedDict):
"""State passed between nodes in the graph"""
topic: str
research_plan: str
research_results: Annotated[List[str], operator.add] # Accumulates results
quality_score: int
final_report: str
iteration_count: int
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
def plan_research(state: ResearchState) -> ResearchState:
"""Node: Create a research plan"""
print("\nšÆ PLANNING PHASE")
print(f"Topic: {state['topic']}")
messages = [
SystemMessage(content="You are a research planner. Create a structured research plan."),
HumanMessage(content=f"Create a research plan for: {state['topic']}\n\nProvide 3-4 key areas to investigate.")
]
response = llm.invoke(messages)
state['research_plan'] = response.content
state['iteration_count'] = 0
print(f"Plan created: {len(state['research_plan'])} characters")
return state
def conduct_research(state: ResearchState) -> ResearchState:
"""Node: Conduct research based on plan"""
print("\nš RESEARCH PHASE")
messages = [
SystemMessage(content="You are a research analyst. Gather detailed information."),
HumanMessage(content=f"Research Plan:\n{state['research_plan']}\n\nProvide comprehensive findings for the topic: {state['topic']}")
]
response = llm.invoke(messages)
# Append to results (operator.add in TypedDict)
new_results = [response.content]
state['research_results'] = state['research_results'] + new_results if state['research_results'] else new_results
state['iteration_count'] += 1
print(f"Research completed: {len(response.content)} characters")
return state
def check_quality(state: ResearchState) -> ResearchState:
"""Node: Evaluate research quality"""
print("\nā QUALITY CHECK PHASE")
messages = [
SystemMessage(content="You are a quality evaluator. Rate research quality from 1-10."),
HumanMessage(content=f"Evaluate this research:\n\n{state['research_results'][-1]}\n\nProvide a score (1-10) and brief reasoning.")
]
response = llm.invoke(messages)
# Extract score (simple parsing - in production use structured output)
try:
score = int([word for word in response.content.split() if word.isdigit()][0])
except:
score = 7 # Default to passing score
state['quality_score'] = score
print(f"Quality score: {score}/10")
return state
def synthesize_report(state: ResearchState) -> ResearchState:
"""Node: Create final report"""
print("\nš SYNTHESIS PHASE")
all_research = "\n\n---\n\n".join(state['research_results'])
messages = [
SystemMessage(content="You are a report writer. Synthesize research into a comprehensive report."),
HumanMessage(content=f"Create a final report on: {state['topic']}\n\nBased on this research:\n\n{all_research}")
]
response = llm.invoke(messages)
state['final_report'] = response.content
print(f"Final report: {len(response.content)} characters")
return state
def should_continue_research(state: ResearchState) -> str:
"""Conditional edge: Decide if more research is needed"""
# Continue if quality is low and we haven't iterated too many times
if state['quality_score'] < 7 and state['iteration_count'] < 2:
print(f"ā ļø Quality below threshold ({state['quality_score']}/10), conducting additional research...")
return "research"
else:
print(f"ā Quality acceptable ({state['quality_score']}/10), proceeding to synthesis")
return "synthesize"
def create_research_graph():
"""Build the LangGraph workflow"""
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("plan", plan_research)
workflow.add_node("research", conduct_research)
workflow.add_node("quality_check", check_quality)
workflow.add_node("synthesize", synthesize_report)
# Add edges
workflow.set_entry_point("plan")
workflow.add_edge("plan", "research")
workflow.add_edge("research", "quality_check")
# Conditional edge - can loop back to research
workflow.add_conditional_edges(
"quality_check",
should_continue_research,
{
"research": "research",
"synthesize": "synthesize"
}
)
workflow.add_edge("synthesize", END)
return workflow.compile()
def main():
"""Test the LangGraph workflow"""
print("=" * 80)
print("EXERCISE 2: LangGraph State Machine")
print("=" * 80)
# Create the graph
app = create_research_graph()
# Test with a research topic
initial_state = {
"topic": "The impact of GPT-4 on software development productivity",
"research_plan": "",
"research_results": [],
"quality_score": 0,
"final_report": "",
"iteration_count": 0
}
# Run the workflow
final_state = app.invoke(initial_state)
# Display results
print("\n" + "=" * 80)
print("FINAL RESULTS")
print("=" * 80)
print(f"\nš Research Plan:\n{final_state['research_plan'][:200]}...")
print(f"\nš Iterations: {final_state['iteration_count']}")
print(f"\nā Final Quality Score: {final_state['quality_score']}/10")
print(f"\nš Final Report:\n{final_state['final_report'][:500]}...")
if __name__ == "__main__":
main()
Step 4: Run the Workflow
python exercise2_langgraph.py
šÆ PLANNING PHASE
Plan created: 543 characters
š RESEARCH PHASE
Research completed: 1247 characters
ā QUALITY CHECK PHASE
Quality score: 6/10
ā ļø Quality below threshold, conducting additional research...
š RESEARCH PHASE
Research completed: 1389 characters
ā QUALITY CHECK PHASE
Quality score: 8/10
ā Quality acceptable, proceeding to synthesis
š SYNTHESIS PHASE
Final report: 2156 characters
- State: Shared dictionary passed between nodes
- Nodes: Functions that process and update state
- Edges: Define workflow progression
- Conditional Edges: Branch based on state values
- Annotations: Control how state fields are updated (e.g., operator.add)
š Bonus Challenge
Add a "human_review" node that requires human input before finalizing the report. Use input() to pause execution and collect feedback.
Deliverable
ā A LangGraph workflow that:
- Plans research based on a topic
- Conducts iterative research
- Evaluates quality automatically
- Loops back if quality is insufficient
- Synthesizes a final report
Build a Multi-Agent Research Team
Create a collaborative system of specialized agents using CrewAI. Three agents (Researcher, Writer, Critic) will work together to produce a polished research report.
Objectives
Step 1: Install CrewAI
pip install crewai crewai-tools
Step 2: Design Agent Roles
Multi-Agent Collaboration
Gathers facts & data
Creates draft report
Reviews & refines
Step 3: Implement Multi-Agent System
Create exercise3_crewai.py:
"""
Exercise 3: Multi-Agent Research Team with CrewAI
Three specialized agents collaborate on research tasks
"""
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from langchain_openai import ChatOpenAI
import os
# Initialize LLM (used by all agents)
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
# Initialize search tool
search_tool = SerperDevTool()
def create_researcher_agent():
"""Agent 1: Research Specialist"""
return Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research and gather accurate, up-to-date information",
backstory="""You are an experienced research analyst with a PhD in information science.
You excel at finding relevant data, validating sources, and identifying key insights.
You're meticulous about fact-checking and always cite your sources.""",
tools=[search_tool],
llm=llm,
verbose=True,
allow_delegation=False
)
def create_writer_agent():
"""Agent 2: Content Writer"""
return Agent(
role="Technical Content Writer",
goal="Transform research findings into clear, engaging, well-structured reports",
backstory="""You are a skilled technical writer with 10 years of experience.
You have a talent for explaining complex topics in accessible language.
You structure information logically and write with clarity and precision.""",
llm=llm,
verbose=True,
allow_delegation=False
)
def create_critic_agent():
"""Agent 3: Quality Critic"""
return Agent(
role="Senior Quality Critic",
goal="Review and improve reports for accuracy, clarity, and completeness",
backstory="""You are a veteran editor and quality assurance specialist.
You have a keen eye for inconsistencies, gaps in logic, and areas for improvement.
You provide constructive feedback that elevates the quality of any document.""",
llm=llm,
verbose=True,
allow_delegation=False
)
def create_research_task(researcher_agent, topic):
"""Task 1: Conduct Research"""
return Task(
description=f"""Conduct comprehensive research on: {topic}
Your research should include:
1. Current state and recent developments
2. Key statistics and data points
3. Expert opinions and analysis
4. Practical implications and applications
5. Future trends and predictions
Gather information from reliable sources and organize your findings clearly.""",
expected_output="""A detailed research report with:
- Executive summary
- Key findings organized by theme
- Supporting data and statistics
- Source citations
- Minimum 800 words""",
agent=researcher_agent
)
def create_writing_task(writer_agent, topic):
"""Task 2: Write Report (depends on research)"""
return Task(
description=f"""Using the research findings, write a comprehensive report on: {topic}
Your report should:
1. Open with an engaging introduction
2. Present information in a logical flow
3. Use clear headings and sections
4. Include concrete examples
5. Conclude with key takeaways
Write for a technical audience but maintain clarity.""",
expected_output="""A polished report with:
- Executive summary (150 words)
- Main body (1000-1500 words)
- Conclusion with actionable insights
- Professional formatting
- Clear section headings""",
agent=writer_agent,
context=[] # Will be populated with previous task output
)
def create_critique_task(critic_agent, topic):
"""Task 3: Review and Refine (depends on writing)"""
return Task(
description=f"""Review the report on {topic} and provide your refined version.
Evaluate:
1. Accuracy of information
2. Clarity of writing
3. Logical flow and structure
4. Completeness of coverage
5. Overall quality and impact
Identify any issues and provide an improved final version.""",
expected_output="""A final report that includes:
- Any corrections to factual errors
- Improved clarity and readability
- Enhanced structure if needed
- A brief editor's note on changes made
- Publication-ready quality""",
agent=critic_agent,
context=[] # Will be populated with previous task output
)
def create_research_crew(topic: str):
"""Assemble the multi-agent crew"""
# Create agents
researcher = create_researcher_agent()
writer = create_writer_agent()
critic = create_critic_agent()
# Create tasks
research_task = create_research_task(researcher, topic)
writing_task = create_writing_task(writer, topic)
critique_task = create_critique_task(critic, topic)
# Set task dependencies (context)
writing_task.context = [research_task]
critique_task.context = [research_task, writing_task]
# Create crew
crew = Crew(
agents=[researcher, writer, critic],
tasks=[research_task, writing_task, critique_task],
process=Process.sequential, # Tasks execute in order
verbose=2 # Maximum verbosity to see agent interactions
)
return crew
def main():
"""Test the multi-agent research team"""
print("=" * 80)
print("EXERCISE 3: Multi-Agent Research Team")
print("=" * 80)
# Define research topic
topic = "The state of autonomous AI agents in 2024: capabilities, limitations, and future directions"
print(f"\nš Research Topic: {topic}\n")
print("š¤ Assembling 3-agent research team...\n")
# Create and run crew
crew = create_research_crew(topic)
print("\nš Starting collaborative research process...\n")
result = crew.kickoff()
# Display final result
print("\n" + "=" * 80)
print("FINAL RESEARCH REPORT")
print("=" * 80)
print(result)
# Save to file
with open("multi_agent_report.md", "w") as f:
f.write(f"# Research Report: {topic}\n\n")
f.write(result)
print("\nā Report saved to: multi_agent_report.md")
if __name__ == "__main__":
main()
This exercise uses GPT-4 for three agents with multiple LLM calls. Expected cost: $1-3 per run. Start with a simpler topic or use GPT-3.5-turbo for testing.
Step 4: Run Multi-Agent System
python exercise3_crewai.py
š¤ Agent: Senior Research Analyst
š Thought: I need to search for recent information about AI agents...
š§ Tool: Search
š Output: [Research findings...]
š¤ Agent: Technical Content Writer
š Thought: I'll structure this research into a comprehensive report...
š Output: [Draft report...]
š¤ Agent: Senior Quality Critic
š Thought: I'll review for accuracy and clarity...
āļø Changes: [Improvements made...]
š Output: [Final polished report...]
- Roles: Each agent has a specific identity and expertise
- Goals: Clear objectives guide agent behavior
- Backstory: Gives context that shapes responses
- Tasks: Concrete deliverables with expected outputs
- Context: Tasks can access outputs from previous tasks
- Process: Sequential ensures proper handoffs
š Bonus Challenge
Add a fourth agent: a "Fact Checker" that runs in parallel with the Writer and validates all claims before the Critic reviews. Use Process.hierarchical for coordination.
Deliverable
ā A multi-agent system that:
- Has three specialized agents with distinct roles
- Executes tasks sequentially with proper handoffs
- Produces a publication-quality research report
- Saves output to a markdown file
Implement Agent Memory System
Add short-term and long-term memory to agents, enabling them to remember past interactions, learn from previous tasks, and maintain context across sessions.
Objectives
Step 1: Install Memory Dependencies
pip install chromadb langchain-community
Step 2: Implement Memory System
Create exercise4_agent_memory.py:
"""
Exercise 4: Agent Memory System
Implements short-term and long-term memory for agents
"""
from typing import List, Dict
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationChain
from langchain.schema import Document
import os
from datetime import datetime
class AgentMemory:
"""Manages short-term and long-term memory for an agent"""
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.llm = ChatOpenAI(model="gpt-4", temperature=0.7)
# Short-term memory: Recent conversation
self.short_term = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Long-term memory: Vector store for persistent knowledge
self.embeddings = OpenAIEmbeddings()
self.long_term = Chroma(
collection_name=f"agent_memory_{agent_name}",
embedding_function=self.embeddings,
persist_directory=f"./agent_memory_{agent_name}"
)
# Summary memory: Condensed conversation history
self.summary = ConversationSummaryMemory(
llm=self.llm,
memory_key="summary"
)
def remember_short_term(self, user_input: str, agent_output: str):
"""Store recent interaction in short-term memory"""
self.short_term.save_context(
{"input": user_input},
{"output": agent_output}
)
# Also update summary
self.summary.save_context(
{"input": user_input},
{"output": agent_output}
)
def remember_long_term(self, content: str, metadata: Dict = None):
"""Store important information in long-term vector memory"""
if metadata is None:
metadata = {}
metadata.update({
"agent": self.agent_name,
"timestamp": datetime.now().isoformat()
})
doc = Document(page_content=content, metadata=metadata)
self.long_term.add_documents([doc])
def recall_relevant(self, query: str, k: int = 3) -> List[str]:
"""Retrieve relevant memories from long-term storage"""
results = self.long_term.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def get_short_term_context(self) -> str:
"""Get recent conversation history"""
return self.short_term.load_memory_variables({})
def get_summary(self) -> str:
"""Get condensed summary of all interactions"""
return self.summary.load_memory_variables({})
def forget_short_term(self):
"""Clear short-term memory (start fresh conversation)"""
self.short_term.clear()
self.summary.clear()
class MemoryEnabledAgent:
"""Agent with memory capabilities"""
def __init__(self, name: str, role: str):
self.name = name
self.role = role
self.memory = AgentMemory(name)
self.llm = ChatOpenAI(model="gpt-4", temperature=0.7)
# Create conversation chain with memory
self.chain = ConversationChain(
llm=self.llm,
memory=self.memory.short_term,
verbose=True
)
def process(self, user_input: str, use_long_term: bool = True) -> str:
"""Process input with memory context"""
# Retrieve relevant long-term memories
context = ""
if use_long_term:
relevant_memories = self.memory.recall_relevant(user_input)
if relevant_memories:
context = "Relevant past knowledge:\n" + "\n".join(f"- {mem}" for mem in relevant_memories)
# Generate response using chain (includes short-term memory)
full_input = f"{context}\n\nCurrent task: {user_input}" if context else user_input
response = self.chain.predict(input=full_input)
# Store important insights in long-term memory
if len(response) > 100: # Only store substantial responses
self.memory.remember_long_term(
content=f"User: {user_input}\nAgent: {response}",
metadata={"type": "interaction", "role": self.role}
)
return response
def learn_fact(self, fact: str, category: str = "knowledge"):
"""Explicitly teach the agent a fact"""
self.memory.remember_long_term(
content=fact,
metadata={"type": "fact", "category": category}
)
print(f"ā {self.name} learned: {fact[:100]}...")
def show_memory_summary(self):
"""Display memory status"""
print(f"\n{'='*60}")
print(f"Memory Status for {self.name}")
print(f"{'='*60}")
# Short-term
short_term = self.memory.get_short_term_context()
print(f"\nš Recent Conversation:")
if short_term.get('chat_history'):
print(f" Messages: {len(short_term['chat_history'])}")
else:
print(" (empty)")
# Summary
summary = self.memory.get_summary()
if summary.get('summary'):
print(f"\nš Conversation Summary:")
print(f" {summary['summary']}")
# Long-term
total_memories = self.memory.long_term._collection.count()
print(f"\nš§ Long-term Memory:")
print(f" Total memories: {total_memories}")
def main():
"""Test agent memory system"""
print("=" * 80)
print("EXERCISE 4: Agent Memory System")
print("=" * 80)
# Create agent with memory
agent = MemoryEnabledAgent(
name="ResearchBot",
role="Research Assistant"
)
print("\nš Teaching agent some facts...\n")
# Teach the agent domain knowledge
agent.learn_fact(
"LangGraph is a framework for building stateful, cyclic multi-agent workflows using a graph-based architecture.",
category="frameworks"
)
agent.learn_fact(
"CrewAI focuses on role-based agent collaboration with simple sequential or hierarchical processes.",
category="frameworks"
)
agent.learn_fact(
"ReAct (Reasoning + Acting) is a pattern where agents alternate between thinking and taking actions with tools.",
category="patterns"
)
print("\nš¬ Starting conversation...\n")
# Conversation 1
response1 = agent.process("What is LangGraph?")
print(f"\nš¤ Agent: {response1}\n")
# Conversation 2 (agent should remember context)
response2 = agent.process("How is it different from CrewAI?")
print(f"\nš¤ Agent: {response2}\n")
# Conversation 3 (testing long-term memory retrieval)
response3 = agent.process("Explain the ReAct pattern and how it relates to what we discussed")
print(f"\nš¤ Agent: {response3}\n")
# Show memory status
agent.show_memory_summary()
print("\n" + "=" * 80)
print("Testing Memory Persistence")
print("=" * 80)
# Clear short-term, but long-term persists
print("\nš Clearing short-term memory...\n")
agent.memory.forget_short_term()
# New conversation should still access long-term knowledge
response4 = agent.process("Tell me what you know about agent frameworks")
print(f"\nš¤ Agent (new session): {response4}\n")
agent.show_memory_summary()
print("\nā Memory system test complete!")
print("\nKey observations:")
print(" - Agent remembers facts across sessions (long-term)")
print(" - Agent maintains conversation context (short-term)")
print(" - Agent can summarize lengthy conversations")
print(" - Memories persist even after clearing short-term buffer")
if __name__ == "__main__":
main()
Step 3: Test Memory System
python exercise4_agent_memory.py
š Teaching agent some facts...
ā ResearchBot learned: LangGraph is a framework for building stateful...
ā ResearchBot learned: CrewAI focuses on role-based agent collaboration...
š¬ Starting conversation...
š¤ Agent: LangGraph is a framework designed for building stateful...
š¤ Agent: LangGraph differs from CrewAI in several key ways...
š Recent Conversation:
Messages: 6
š§ Long-term Memory:
Total memories: 5
- Short-term (Buffer): Last N messages in conversation
- Summary: Condensed version of all interactions (saves tokens)
- Long-term (Vector): Semantic search over all past knowledge
- Persistence: Long-term survives restarts, short-term doesn't
š Bonus Challenge
Implement "importance scoring" for memories. Only store memories that exceed a certain importance threshold (as judged by the LLM). Add a forget_old_memories() method that removes low-importance memories older than 30 days.
Deliverable
ā An agent memory system with:
- Short-term conversation buffer
- Long-term vector storage
- Automatic memory summarization
- Semantic memory retrieval
- Persistence across sessions
Deploy Production Agent API
Build a production-ready FastAPI service that exposes your agents as RESTful endpoints with streaming responses, error handling, and monitoring.
Objectives
Step 1: Install FastAPI Dependencies
pip install fastapi uvicorn pydantic
Step 2: Create Production API
Create exercise5_agent_api.py:
"""
Exercise 5: Production Agent API
FastAPI service for deploying agents with streaming, error handling, and monitoring
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import asyncio
import logging
from datetime import datetime
import uuid
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool
from langchain_community.utilities import SerpAPIWrapper
from langchain import hub
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(
title="Agent API",
description="Production-ready API for autonomous AI agents",
version="1.0.0"
)
# Request/Response Models
class AgentRequest(BaseModel):
query: str = Field(..., description="The user's query for the agent")
stream: bool = Field(False, description="Enable streaming responses")
max_iterations: int = Field(5, description="Maximum agent iterations")
class AgentResponse(BaseModel):
request_id: str
query: str
output: str
iterations: int
tools_used: List[str]
timestamp: str
class HealthResponse(BaseModel):
status: str
timestamp: str
uptime_seconds: float
# Global state
start_time = datetime.now()
request_count = 0
agent_cache = {}
# Agent initialization
def get_agent_executor():
"""Initialize or retrieve cached agent"""
global agent_cache
if "main_agent" in agent_cache:
return agent_cache["main_agent"]
logger.info("Initializing agent executor...")
# LLM
llm = ChatOpenAI(model="gpt-4", temperature=0, verbose=True)
# Tools
search = SerpAPIWrapper()
tools = [
Tool(
name="Search",
func=search.run,
description="Search the internet for information"
),
Tool(
name="Calculator",
func=lambda x: str(eval(x, {"__builtins__": {}}, {})),
description="Perform calculations"
)
]
# Agent
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=5
)
agent_cache["main_agent"] = executor
logger.info("Agent initialized successfully")
return executor
# Endpoints
@app.get("/", tags=["Root"])
async def root():
"""Root endpoint"""
return {
"message": "Agent API is running",
"docs": "/docs",
"health": "/health"
}
@app.get("/health", response_model=HealthResponse, tags=["Monitoring"])
async def health_check():
"""Health check endpoint"""
uptime = (datetime.now() - start_time).total_seconds()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"uptime_seconds": uptime
}
@app.post("/agent/query", response_model=AgentResponse, tags=["Agent"])
async def query_agent(request: AgentRequest, background_tasks: BackgroundTasks):
"""
Execute agent query (non-streaming)
This endpoint runs the agent and returns the complete response.
"""
global request_count
request_count += 1
request_id = str(uuid.uuid4())
logger.info(f"[{request_id}] Received query: {request.query}")
try:
# Get agent
agent = get_agent_executor()
agent.max_iterations = request.max_iterations
# Execute
result = agent.invoke({"input": request.query})
# Extract metadata
iterations = len(result.get("intermediate_steps", []))
tools_used = [
step[0].tool for step in result.get("intermediate_steps", [])
]
logger.info(f"[{request_id}] Query completed: {iterations} iterations, {len(tools_used)} tool calls")
return AgentResponse(
request_id=request_id,
query=request.query,
output=result["output"],
iterations=iterations,
tools_used=tools_used,
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"[{request_id}] Error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
async def stream_agent_response(query: str, max_iterations: int):
"""Generate streaming agent response"""
agent = get_agent_executor()
agent.max_iterations = max_iterations
try:
# Simulate streaming (in production, use actual streaming callbacks)
yield f"data: {{\"status\": \"started\", \"query\": \"{query}\"}}\n\n"
await asyncio.sleep(0.1)
# Execute agent
result = agent.invoke({"input": query})
# Stream intermediate steps
for i, step in enumerate(result.get("intermediate_steps", [])):
action, observation = step
yield f"data: {{\"type\": \"step\", \"number\": {i+1}, \"tool\": \"{action.tool}\", \"input\": \"{action.tool_input[:100]}...\"}}\n\n"
await asyncio.sleep(0.1)
# Stream final output
output_lines = result["output"].split("\n")
for line in output_lines:
yield f"data: {{\"type\": \"output\", \"content\": \"{line}\"}}\n\n"
await asyncio.sleep(0.05)
yield f"data: {{\"status\": \"completed\"}}\n\n"
except Exception as e:
logger.error(f"Streaming error: {str(e)}")
yield f"data: {{\"status\": \"error\", \"message\": \"{str(e)}\"}}\n\n"
@app.post("/agent/stream", tags=["Agent"])
async def query_agent_stream(request: AgentRequest):
"""
Execute agent query with streaming response
This endpoint returns Server-Sent Events (SSE) for real-time updates.
"""
logger.info(f"Streaming query: {request.query}")
return StreamingResponse(
stream_agent_response(request.query, request.max_iterations),
media_type="text/event-stream"
)
@app.get("/metrics", tags=["Monitoring"])
async def metrics():
"""Get API metrics"""
uptime = (datetime.now() - start_time).total_seconds()
return {
"total_requests": request_count,
"uptime_seconds": uptime,
"agent_cached": "main_agent" in agent_cache,
"timestamp": datetime.now().isoformat()
}
# Startup/Shutdown Events
@app.on_event("startup")
async def startup_event():
"""Initialize on startup"""
logger.info("Starting Agent API...")
logger.info("Preloading agent executor...")
get_agent_executor()
logger.info("Agent API ready!")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
logger.info("Shutting down Agent API...")
# Run with: uvicorn exercise5_agent_api:app --reload --port 8000
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Step 3: Launch the API
uvicorn exercise5_agent_api:app --reload --port 8000
Step 4: Test the API
Option 1: Swagger UI (Recommended)
Open your browser: http://localhost:8000/docs
Option 2: curl Commands
# Health check
curl http://localhost:8000/health
# Non-streaming query
curl -X POST http://localhost:8000/agent/query \
-H "Content-Type: application/json" \
-d '{
"query": "What is 15 * 73?",
"max_iterations": 5
}'
# Streaming query
curl -N -X POST http://localhost:8000/agent/stream \
-H "Content-Type: application/json" \
-d '{
"query": "Search for the population of Paris and calculate 10% of that number",
"stream": true
}'
# Metrics
curl http://localhost:8000/metrics
Option 3: Python Client
# test_agent_api.py
import requests
import json
# Test non-streaming
response = requests.post(
"http://localhost:8000/agent/query",
json={"query": "What is the capital of France?"}
)
print(json.dumps(response.json(), indent=2))
# Test streaming
response = requests.post(
"http://localhost:8000/agent/stream",
json={"query": "Calculate 123 * 456"},
stream=True
)
for line in response.iter_lines():
if line:
print(line.decode('utf-8'))
{
"request_id": "a3f2c891-...",
"query": "What is 15 * 73?",
"output": "The result of 15 * 73 is 1095.",
"iterations": 2,
"tools_used": ["Calculator"],
"timestamp": "2024-01-15T10:30:45..."
}
- Authentication: Add API keys or OAuth
- Rate Limiting: Prevent abuse with throttling
- Caching: Cache agent responses for common queries
- Monitoring: Track latency, errors, costs
- Queueing: Handle high load with task queues
- Timeouts: Set maximum execution time
- Never expose production API keys in code
- Implement request validation and sanitization
- Add authentication before deploying publicly
- Set resource limits to prevent abuse
- Log all requests for audit trails
š Bonus Challenge
Deploy your API to a cloud platform (AWS Lambda, Google Cloud Run, or Railway). Add authentication using JWT tokens. Implement a simple React frontend that calls your API and displays streaming responses in real-time.
Deliverable
ā A production-ready API with:
- RESTful endpoints for agent queries
- Streaming response support (SSE)
- Health check and metrics endpoints
- Comprehensive error handling
- Request logging and monitoring
- Auto-generated API documentation (Swagger)
š Lab Complete!
Congratulations on building a complete multi-agent system!
What You Built:
- ā ReAct agent with tool use
- ā LangGraph state machine with workflows
- ā Multi-agent collaboration system (CrewAI)
- ā Agent memory system (short & long-term)
- ā Production FastAPI service
Next: Take the Module 3 Quiz to test your knowledge!
Take the Quiz āEstimated Costs
Total Lab Cost: $5-10 (GPT-4 API)
- Exercise 1 (ReAct): ~$0.50
- Exercise 2 (LangGraph): ~$1-2
- Exercise 3 (Multi-Agent): ~$2-4
- Exercise 4 (Memory): ~$1-2
- Exercise 5 (API): ~$0.50-1
š” Cost Saving Tip: Use GPT-3.5-turbo for testing (10x cheaper), then switch to GPT-4 for final runs.
Troubleshooting
Common Issues
1. API Key Errors
Error: Invalid API key
Solution: Verify .env file has correct keys, run source .env
2. Import Errors
ModuleNotFoundError: No module named 'langchain'
Solution: Activate virtual environment, reinstall packages
3. Agent Loops Forever
Agent exceeded maximum iterations
Solution: Reduce max_iterations, simplify query, check tool descriptions
4. Memory Persistence Issues
ChromaDB collection not found
Solution: Check persist_directory path, verify write permissions
5. API Won't Start
Address already in use
Solution: Kill process on port 8000 or use different port
Going Further
- Advanced Patterns: Implement hierarchical agent teams with AutoGPT
- Custom Tools: Build domain-specific tools (database queries, API calls)
- Human-in-the-Loop: Add approval steps for critical agent actions
- Evaluation: Create test suites to measure agent performance
- Cost Optimization: Implement caching, prompt compression, model routing