MODULE 3 - HANDS-ON LAB ā±ļø 2.5-3 hours šŸ’» 5 exercises šŸŽÆ Advanced

Build a Multi-Agent Research Team

Create autonomous AI agents that collaborate to conduct research, analyze data, and generate comprehensive reports

Lab Overview

In this advanced lab, you'll build a complete multi-agent system that showcases the power of collaborative AI. You'll start with simple ReAct agents, progress through LangGraph state machines, and culminate in a specialized team of agents working together with memory and production-grade APIs.

Final System Architecture

šŸ” Researcher Agent
→
āœļø Writer Agent
→
šŸŽÆ Critic Agent
→
šŸ“Š Final Report

Each agent has specialized tools, persistent memory, and the ability to collaborate through a shared state machine

What You'll Build

  • Exercise 1: Simple ReAct agent with tool use (search, calculator)
  • Exercise 2: LangGraph state machine with complex workflows
  • Exercise 3: Multi-agent system using CrewAI (researcher, writer, critic)
  • Exercise 4: Agent memory system with vector storage
  • Exercise 5: Production FastAPI service with streaming
āš ļø Prerequisites
  • Python 3.10+ installed
  • OpenAI API key ($5-10 budget for this lab)
  • SerpAPI key (free tier available)
  • Basic understanding of async Python
  • Terminal/command line proficiency
šŸ’” Pro Tips for Success
  • Start with smaller test queries before running expensive multi-agent workflows
  • Monitor your API costs throughout the lab
  • Read all instructions before starting each exercise
  • Save your work frequently (agents can be unpredictable!)
  • Review the expected outputs before testing
EXERCISE 1 ā±ļø 20 minutes

Build a Simple ReAct Agent

Learn the fundamentals of agentic AI by implementing a ReAct (Reasoning + Acting) agent with LangChain. Your agent will use external tools (web search and calculator) to answer questions it couldn't solve alone.

Objectives

Step 1: Environment Setup

First, install the required packages and set up your API keys:

# Install dependencies
pip install langchain langchain-openai python-dotenv google-search-results

# Create .env file
cat > .env << 'EOF'
OPENAI_API_KEY=your_openai_api_key_here
SERPAPI_API_KEY=your_serpapi_key_here
EOF

Step 2: Create the ReAct Agent

Create exercise1_react_agent.py with the following code:

"""
Exercise 1: Simple ReAct Agent
Implements Reasoning + Acting pattern with tools
"""

import os
from dotenv import load_dotenv
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain_community.utilities import SerpAPIWrapper
from langchain import hub

# Load environment variables
load_dotenv()

def create_calculator_tool():
    """Simple calculator tool for mathematical operations"""
    def calculate(expression: str) -> str:
        """Evaluate a mathematical expression. Input should be a valid Python expression."""
        try:
            # Safe evaluation of mathematical expressions
            result = eval(expression, {"__builtins__": {}}, {})
            return f"Result: {result}"
        except Exception as e:
            return f"Error: {str(e)}"

    return Tool(
        name="Calculator",
        func=calculate,
        description="Useful for mathematical calculations. Input should be a valid Python expression like '2 + 2' or '(15 * 7) / 3'"
    )

def create_search_tool():
    """Web search tool using SerpAPI"""
    search = SerpAPIWrapper()
    return Tool(
        name="Search",
        func=search.run,
        description="Useful for searching the internet for current information. Input should be a search query."
    )

def create_react_agent_executor():
    """Create and configure the ReAct agent"""
    # Initialize LLM
    llm = ChatOpenAI(
        model="gpt-4",
        temperature=0,
        verbose=True
    )

    # Create tools
    tools = [
        create_search_tool(),
        create_calculator_tool()
    ]

    # Get the ReAct prompt template from LangChain hub
    prompt = hub.pull("hwchase17/react")

    # Create the agent
    agent = create_react_agent(llm, tools, prompt)

    # Create executor
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        handle_parsing_errors=True,
        max_iterations=5
    )

    return agent_executor

def main():
    """Test the ReAct agent with sample queries"""
    print("=" * 80)
    print("EXERCISE 1: ReAct Agent with Tools")
    print("=" * 80)

    agent = create_react_agent_executor()

    # Test queries that require different tools
    test_queries = [
        "What is the current population of Tokyo?",
        "Calculate the result of (1234 + 5678) * 3",
        "Who won the 2023 FIFA Women's World Cup and what was the final score?"
    ]

    for i, query in enumerate(test_queries, 1):
        print(f"\n{'=' * 80}")
        print(f"Query {i}: {query}")
        print(f"{'=' * 80}\n")

        try:
            result = agent.invoke({"input": query})
            print(f"\nāœ“ Final Answer: {result['output']}")
        except Exception as e:
            print(f"\nāœ— Error: {str(e)}")

        print("\n" + "=" * 80)

if __name__ == "__main__":
    main()

Step 3: Run and Observe

Execute the agent and watch the ReAct loop in action:

python exercise1_react_agent.py
Expected Output Pattern:
Thought: I need to search for the current population of Tokyo
Action: Search
Action Input: "Tokyo population 2024"
Observation: Tokyo has a population of approximately 14 million...
Thought: I now know the final answer
Final Answer: The current population of Tokyo is approximately 14 million people...
šŸ’” What's Happening Here?

The agent is following the ReAct pattern:

  1. Thought: Agent reasons about what to do
  2. Action: Agent selects a tool to use
  3. Action Input: Agent provides input to the tool
  4. Observation: Agent receives tool output
  5. Repeat or Answer: Loop continues until agent has final answer

🌟 Bonus Challenge

Add a custom "Wikipedia" tool that fetches article summaries. Test it with queries like "What is quantum computing according to Wikipedia?"

Deliverable

āœ… A working ReAct agent that can:

  • Search the web for current information
  • Perform mathematical calculations
  • Chain multiple tool calls together
  • Provide reasoning for its actions
EXERCISE 2 ā±ļø 30 minutes

Build a LangGraph State Machine

Move beyond linear agent execution to create complex workflows with conditional logic, loops, and state management using LangGraph.

Objectives

Step 1: Install LangGraph

pip install langgraph

Step 2: Design the Workflow

Research Workflow Graph

START
→
Plan
→
Research
→
Quality Check
→
Synthesize
→
END

If quality check fails, loop back to Research node

Step 3: Implement the State Machine

Create exercise2_langgraph.py:

"""
Exercise 2: LangGraph State Machine
Implements a complex research workflow with conditional logic
"""

from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage

# Define the state schema
class ResearchState(TypedDict):
    """State passed between nodes in the graph"""
    topic: str
    research_plan: str
    research_results: Annotated[List[str], operator.add]  # Accumulates results
    quality_score: int
    final_report: str
    iteration_count: int

# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

def plan_research(state: ResearchState) -> ResearchState:
    """Node: Create a research plan"""
    print("\nšŸŽÆ PLANNING PHASE")
    print(f"Topic: {state['topic']}")

    messages = [
        SystemMessage(content="You are a research planner. Create a structured research plan."),
        HumanMessage(content=f"Create a research plan for: {state['topic']}\n\nProvide 3-4 key areas to investigate.")
    ]

    response = llm.invoke(messages)
    state['research_plan'] = response.content
    state['iteration_count'] = 0

    print(f"Plan created: {len(state['research_plan'])} characters")
    return state

def conduct_research(state: ResearchState) -> ResearchState:
    """Node: Conduct research based on plan"""
    print("\nšŸ” RESEARCH PHASE")

    messages = [
        SystemMessage(content="You are a research analyst. Gather detailed information."),
        HumanMessage(content=f"Research Plan:\n{state['research_plan']}\n\nProvide comprehensive findings for the topic: {state['topic']}")
    ]

    response = llm.invoke(messages)

    # Append to results (operator.add in TypedDict)
    new_results = [response.content]
    state['research_results'] = state['research_results'] + new_results if state['research_results'] else new_results
    state['iteration_count'] += 1

    print(f"Research completed: {len(response.content)} characters")
    return state

def check_quality(state: ResearchState) -> ResearchState:
    """Node: Evaluate research quality"""
    print("\nāœ“ QUALITY CHECK PHASE")

    messages = [
        SystemMessage(content="You are a quality evaluator. Rate research quality from 1-10."),
        HumanMessage(content=f"Evaluate this research:\n\n{state['research_results'][-1]}\n\nProvide a score (1-10) and brief reasoning.")
    ]

    response = llm.invoke(messages)

    # Extract score (simple parsing - in production use structured output)
    try:
        score = int([word for word in response.content.split() if word.isdigit()][0])
    except:
        score = 7  # Default to passing score

    state['quality_score'] = score

    print(f"Quality score: {score}/10")
    return state

def synthesize_report(state: ResearchState) -> ResearchState:
    """Node: Create final report"""
    print("\nšŸ“Š SYNTHESIS PHASE")

    all_research = "\n\n---\n\n".join(state['research_results'])

    messages = [
        SystemMessage(content="You are a report writer. Synthesize research into a comprehensive report."),
        HumanMessage(content=f"Create a final report on: {state['topic']}\n\nBased on this research:\n\n{all_research}")
    ]

    response = llm.invoke(messages)
    state['final_report'] = response.content

    print(f"Final report: {len(response.content)} characters")
    return state

def should_continue_research(state: ResearchState) -> str:
    """Conditional edge: Decide if more research is needed"""
    # Continue if quality is low and we haven't iterated too many times
    if state['quality_score'] < 7 and state['iteration_count'] < 2:
        print(f"āš ļø Quality below threshold ({state['quality_score']}/10), conducting additional research...")
        return "research"
    else:
        print(f"āœ“ Quality acceptable ({state['quality_score']}/10), proceeding to synthesis")
        return "synthesize"

def create_research_graph():
    """Build the LangGraph workflow"""
    workflow = StateGraph(ResearchState)

    # Add nodes
    workflow.add_node("plan", plan_research)
    workflow.add_node("research", conduct_research)
    workflow.add_node("quality_check", check_quality)
    workflow.add_node("synthesize", synthesize_report)

    # Add edges
    workflow.set_entry_point("plan")
    workflow.add_edge("plan", "research")
    workflow.add_edge("research", "quality_check")

    # Conditional edge - can loop back to research
    workflow.add_conditional_edges(
        "quality_check",
        should_continue_research,
        {
            "research": "research",
            "synthesize": "synthesize"
        }
    )

    workflow.add_edge("synthesize", END)

    return workflow.compile()

def main():
    """Test the LangGraph workflow"""
    print("=" * 80)
    print("EXERCISE 2: LangGraph State Machine")
    print("=" * 80)

    # Create the graph
    app = create_research_graph()

    # Test with a research topic
    initial_state = {
        "topic": "The impact of GPT-4 on software development productivity",
        "research_plan": "",
        "research_results": [],
        "quality_score": 0,
        "final_report": "",
        "iteration_count": 0
    }

    # Run the workflow
    final_state = app.invoke(initial_state)

    # Display results
    print("\n" + "=" * 80)
    print("FINAL RESULTS")
    print("=" * 80)
    print(f"\nšŸ“‹ Research Plan:\n{final_state['research_plan'][:200]}...")
    print(f"\nšŸ” Iterations: {final_state['iteration_count']}")
    print(f"\n⭐ Final Quality Score: {final_state['quality_score']}/10")
    print(f"\nšŸ“Š Final Report:\n{final_state['final_report'][:500]}...")

if __name__ == "__main__":
    main()

Step 4: Run the Workflow

python exercise2_langgraph.py
Expected Output:
šŸŽÆ PLANNING PHASE
Plan created: 543 characters

šŸ” RESEARCH PHASE
Research completed: 1247 characters

āœ“ QUALITY CHECK PHASE
Quality score: 6/10
āš ļø Quality below threshold, conducting additional research...

šŸ” RESEARCH PHASE
Research completed: 1389 characters

āœ“ QUALITY CHECK PHASE
Quality score: 8/10
āœ“ Quality acceptable, proceeding to synthesis

šŸ“Š SYNTHESIS PHASE
Final report: 2156 characters
šŸ’” Key LangGraph Concepts
  • State: Shared dictionary passed between nodes
  • Nodes: Functions that process and update state
  • Edges: Define workflow progression
  • Conditional Edges: Branch based on state values
  • Annotations: Control how state fields are updated (e.g., operator.add)

🌟 Bonus Challenge

Add a "human_review" node that requires human input before finalizing the report. Use input() to pause execution and collect feedback.

Deliverable

āœ… A LangGraph workflow that:

  • Plans research based on a topic
  • Conducts iterative research
  • Evaluates quality automatically
  • Loops back if quality is insufficient
  • Synthesizes a final report
EXERCISE 3 ā±ļø 35 minutes

Build a Multi-Agent Research Team

Create a collaborative system of specialized agents using CrewAI. Three agents (Researcher, Writer, Critic) will work together to produce a polished research report.

Objectives

Step 1: Install CrewAI

pip install crewai crewai-tools

Step 2: Design Agent Roles

Multi-Agent Collaboration

šŸ” Researcher
Gathers facts & data
→
āœļø Writer
Creates draft report
→
šŸŽÆ Critic
Reviews & refines

Step 3: Implement Multi-Agent System

Create exercise3_crewai.py:

"""
Exercise 3: Multi-Agent Research Team with CrewAI
Three specialized agents collaborate on research tasks
"""

from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from langchain_openai import ChatOpenAI
import os

# Initialize LLM (used by all agents)
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

# Initialize search tool
search_tool = SerperDevTool()

def create_researcher_agent():
    """Agent 1: Research Specialist"""
    return Agent(
        role="Senior Research Analyst",
        goal="Conduct comprehensive research and gather accurate, up-to-date information",
        backstory="""You are an experienced research analyst with a PhD in information science.
        You excel at finding relevant data, validating sources, and identifying key insights.
        You're meticulous about fact-checking and always cite your sources.""",
        tools=[search_tool],
        llm=llm,
        verbose=True,
        allow_delegation=False
    )

def create_writer_agent():
    """Agent 2: Content Writer"""
    return Agent(
        role="Technical Content Writer",
        goal="Transform research findings into clear, engaging, well-structured reports",
        backstory="""You are a skilled technical writer with 10 years of experience.
        You have a talent for explaining complex topics in accessible language.
        You structure information logically and write with clarity and precision.""",
        llm=llm,
        verbose=True,
        allow_delegation=False
    )

def create_critic_agent():
    """Agent 3: Quality Critic"""
    return Agent(
        role="Senior Quality Critic",
        goal="Review and improve reports for accuracy, clarity, and completeness",
        backstory="""You are a veteran editor and quality assurance specialist.
        You have a keen eye for inconsistencies, gaps in logic, and areas for improvement.
        You provide constructive feedback that elevates the quality of any document.""",
        llm=llm,
        verbose=True,
        allow_delegation=False
    )

def create_research_task(researcher_agent, topic):
    """Task 1: Conduct Research"""
    return Task(
        description=f"""Conduct comprehensive research on: {topic}

        Your research should include:
        1. Current state and recent developments
        2. Key statistics and data points
        3. Expert opinions and analysis
        4. Practical implications and applications
        5. Future trends and predictions

        Gather information from reliable sources and organize your findings clearly.""",
        expected_output="""A detailed research report with:
        - Executive summary
        - Key findings organized by theme
        - Supporting data and statistics
        - Source citations
        - Minimum 800 words""",
        agent=researcher_agent
    )

def create_writing_task(writer_agent, topic):
    """Task 2: Write Report (depends on research)"""
    return Task(
        description=f"""Using the research findings, write a comprehensive report on: {topic}

        Your report should:
        1. Open with an engaging introduction
        2. Present information in a logical flow
        3. Use clear headings and sections
        4. Include concrete examples
        5. Conclude with key takeaways

        Write for a technical audience but maintain clarity.""",
        expected_output="""A polished report with:
        - Executive summary (150 words)
        - Main body (1000-1500 words)
        - Conclusion with actionable insights
        - Professional formatting
        - Clear section headings""",
        agent=writer_agent,
        context=[]  # Will be populated with previous task output
    )

def create_critique_task(critic_agent, topic):
    """Task 3: Review and Refine (depends on writing)"""
    return Task(
        description=f"""Review the report on {topic} and provide your refined version.

        Evaluate:
        1. Accuracy of information
        2. Clarity of writing
        3. Logical flow and structure
        4. Completeness of coverage
        5. Overall quality and impact

        Identify any issues and provide an improved final version.""",
        expected_output="""A final report that includes:
        - Any corrections to factual errors
        - Improved clarity and readability
        - Enhanced structure if needed
        - A brief editor's note on changes made
        - Publication-ready quality""",
        agent=critic_agent,
        context=[]  # Will be populated with previous task output
    )

def create_research_crew(topic: str):
    """Assemble the multi-agent crew"""
    # Create agents
    researcher = create_researcher_agent()
    writer = create_writer_agent()
    critic = create_critic_agent()

    # Create tasks
    research_task = create_research_task(researcher, topic)
    writing_task = create_writing_task(writer, topic)
    critique_task = create_critique_task(critic, topic)

    # Set task dependencies (context)
    writing_task.context = [research_task]
    critique_task.context = [research_task, writing_task]

    # Create crew
    crew = Crew(
        agents=[researcher, writer, critic],
        tasks=[research_task, writing_task, critique_task],
        process=Process.sequential,  # Tasks execute in order
        verbose=2  # Maximum verbosity to see agent interactions
    )

    return crew

def main():
    """Test the multi-agent research team"""
    print("=" * 80)
    print("EXERCISE 3: Multi-Agent Research Team")
    print("=" * 80)

    # Define research topic
    topic = "The state of autonomous AI agents in 2024: capabilities, limitations, and future directions"

    print(f"\nšŸ“‹ Research Topic: {topic}\n")
    print("šŸ¤– Assembling 3-agent research team...\n")

    # Create and run crew
    crew = create_research_crew(topic)

    print("\nšŸš€ Starting collaborative research process...\n")
    result = crew.kickoff()

    # Display final result
    print("\n" + "=" * 80)
    print("FINAL RESEARCH REPORT")
    print("=" * 80)
    print(result)

    # Save to file
    with open("multi_agent_report.md", "w") as f:
        f.write(f"# Research Report: {topic}\n\n")
        f.write(result)

    print("\nāœ“ Report saved to: multi_agent_report.md")

if __name__ == "__main__":
    main()
āš ļø Cost Warning

This exercise uses GPT-4 for three agents with multiple LLM calls. Expected cost: $1-3 per run. Start with a simpler topic or use GPT-3.5-turbo for testing.

Step 4: Run Multi-Agent System

python exercise3_crewai.py
Expected Output Pattern:
šŸ¤– Agent: Senior Research Analyst
šŸ’­ Thought: I need to search for recent information about AI agents...
šŸ”§ Tool: Search
šŸ“ Output: [Research findings...]

šŸ¤– Agent: Technical Content Writer
šŸ’­ Thought: I'll structure this research into a comprehensive report...
šŸ“ Output: [Draft report...]

šŸ¤– Agent: Senior Quality Critic
šŸ’­ Thought: I'll review for accuracy and clarity...
āœļø Changes: [Improvements made...]
šŸ“ Output: [Final polished report...]
šŸ’” Understanding Multi-Agent Collaboration
  • Roles: Each agent has a specific identity and expertise
  • Goals: Clear objectives guide agent behavior
  • Backstory: Gives context that shapes responses
  • Tasks: Concrete deliverables with expected outputs
  • Context: Tasks can access outputs from previous tasks
  • Process: Sequential ensures proper handoffs

🌟 Bonus Challenge

Add a fourth agent: a "Fact Checker" that runs in parallel with the Writer and validates all claims before the Critic reviews. Use Process.hierarchical for coordination.

Deliverable

āœ… A multi-agent system that:

  • Has three specialized agents with distinct roles
  • Executes tasks sequentially with proper handoffs
  • Produces a publication-quality research report
  • Saves output to a markdown file
EXERCISE 4 ā±ļø 25 minutes

Implement Agent Memory System

Add short-term and long-term memory to agents, enabling them to remember past interactions, learn from previous tasks, and maintain context across sessions.

Objectives

Step 1: Install Memory Dependencies

pip install chromadb langchain-community

Step 2: Implement Memory System

Create exercise4_agent_memory.py:

"""
Exercise 4: Agent Memory System
Implements short-term and long-term memory for agents
"""

from typing import List, Dict
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationChain
from langchain.schema import Document
import os
from datetime import datetime

class AgentMemory:
    """Manages short-term and long-term memory for an agent"""

    def __init__(self, agent_name: str):
        self.agent_name = agent_name
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.7)

        # Short-term memory: Recent conversation
        self.short_term = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )

        # Long-term memory: Vector store for persistent knowledge
        self.embeddings = OpenAIEmbeddings()
        self.long_term = Chroma(
            collection_name=f"agent_memory_{agent_name}",
            embedding_function=self.embeddings,
            persist_directory=f"./agent_memory_{agent_name}"
        )

        # Summary memory: Condensed conversation history
        self.summary = ConversationSummaryMemory(
            llm=self.llm,
            memory_key="summary"
        )

    def remember_short_term(self, user_input: str, agent_output: str):
        """Store recent interaction in short-term memory"""
        self.short_term.save_context(
            {"input": user_input},
            {"output": agent_output}
        )

        # Also update summary
        self.summary.save_context(
            {"input": user_input},
            {"output": agent_output}
        )

    def remember_long_term(self, content: str, metadata: Dict = None):
        """Store important information in long-term vector memory"""
        if metadata is None:
            metadata = {}

        metadata.update({
            "agent": self.agent_name,
            "timestamp": datetime.now().isoformat()
        })

        doc = Document(page_content=content, metadata=metadata)
        self.long_term.add_documents([doc])

    def recall_relevant(self, query: str, k: int = 3) -> List[str]:
        """Retrieve relevant memories from long-term storage"""
        results = self.long_term.similarity_search(query, k=k)
        return [doc.page_content for doc in results]

    def get_short_term_context(self) -> str:
        """Get recent conversation history"""
        return self.short_term.load_memory_variables({})

    def get_summary(self) -> str:
        """Get condensed summary of all interactions"""
        return self.summary.load_memory_variables({})

    def forget_short_term(self):
        """Clear short-term memory (start fresh conversation)"""
        self.short_term.clear()
        self.summary.clear()

class MemoryEnabledAgent:
    """Agent with memory capabilities"""

    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.memory = AgentMemory(name)
        self.llm = ChatOpenAI(model="gpt-4", temperature=0.7)

        # Create conversation chain with memory
        self.chain = ConversationChain(
            llm=self.llm,
            memory=self.memory.short_term,
            verbose=True
        )

    def process(self, user_input: str, use_long_term: bool = True) -> str:
        """Process input with memory context"""

        # Retrieve relevant long-term memories
        context = ""
        if use_long_term:
            relevant_memories = self.memory.recall_relevant(user_input)
            if relevant_memories:
                context = "Relevant past knowledge:\n" + "\n".join(f"- {mem}" for mem in relevant_memories)

        # Generate response using chain (includes short-term memory)
        full_input = f"{context}\n\nCurrent task: {user_input}" if context else user_input
        response = self.chain.predict(input=full_input)

        # Store important insights in long-term memory
        if len(response) > 100:  # Only store substantial responses
            self.memory.remember_long_term(
                content=f"User: {user_input}\nAgent: {response}",
                metadata={"type": "interaction", "role": self.role}
            )

        return response

    def learn_fact(self, fact: str, category: str = "knowledge"):
        """Explicitly teach the agent a fact"""
        self.memory.remember_long_term(
            content=fact,
            metadata={"type": "fact", "category": category}
        )
        print(f"āœ“ {self.name} learned: {fact[:100]}...")

    def show_memory_summary(self):
        """Display memory status"""
        print(f"\n{'='*60}")
        print(f"Memory Status for {self.name}")
        print(f"{'='*60}")

        # Short-term
        short_term = self.memory.get_short_term_context()
        print(f"\nšŸ“‹ Recent Conversation:")
        if short_term.get('chat_history'):
            print(f"  Messages: {len(short_term['chat_history'])}")
        else:
            print("  (empty)")

        # Summary
        summary = self.memory.get_summary()
        if summary.get('summary'):
            print(f"\nšŸ“ Conversation Summary:")
            print(f"  {summary['summary']}")

        # Long-term
        total_memories = self.memory.long_term._collection.count()
        print(f"\n🧠 Long-term Memory:")
        print(f"  Total memories: {total_memories}")

def main():
    """Test agent memory system"""
    print("=" * 80)
    print("EXERCISE 4: Agent Memory System")
    print("=" * 80)

    # Create agent with memory
    agent = MemoryEnabledAgent(
        name="ResearchBot",
        role="Research Assistant"
    )

    print("\nšŸ“š Teaching agent some facts...\n")

    # Teach the agent domain knowledge
    agent.learn_fact(
        "LangGraph is a framework for building stateful, cyclic multi-agent workflows using a graph-based architecture.",
        category="frameworks"
    )
    agent.learn_fact(
        "CrewAI focuses on role-based agent collaboration with simple sequential or hierarchical processes.",
        category="frameworks"
    )
    agent.learn_fact(
        "ReAct (Reasoning + Acting) is a pattern where agents alternate between thinking and taking actions with tools.",
        category="patterns"
    )

    print("\nšŸ’¬ Starting conversation...\n")

    # Conversation 1
    response1 = agent.process("What is LangGraph?")
    print(f"\nšŸ¤– Agent: {response1}\n")

    # Conversation 2 (agent should remember context)
    response2 = agent.process("How is it different from CrewAI?")
    print(f"\nšŸ¤– Agent: {response2}\n")

    # Conversation 3 (testing long-term memory retrieval)
    response3 = agent.process("Explain the ReAct pattern and how it relates to what we discussed")
    print(f"\nšŸ¤– Agent: {response3}\n")

    # Show memory status
    agent.show_memory_summary()

    print("\n" + "=" * 80)
    print("Testing Memory Persistence")
    print("=" * 80)

    # Clear short-term, but long-term persists
    print("\nšŸ”„ Clearing short-term memory...\n")
    agent.memory.forget_short_term()

    # New conversation should still access long-term knowledge
    response4 = agent.process("Tell me what you know about agent frameworks")
    print(f"\nšŸ¤– Agent (new session): {response4}\n")

    agent.show_memory_summary()

    print("\nāœ“ Memory system test complete!")
    print("\nKey observations:")
    print("  - Agent remembers facts across sessions (long-term)")
    print("  - Agent maintains conversation context (short-term)")
    print("  - Agent can summarize lengthy conversations")
    print("  - Memories persist even after clearing short-term buffer")

if __name__ == "__main__":
    main()

Step 3: Test Memory System

python exercise4_agent_memory.py
Expected Output:
šŸ“š Teaching agent some facts...
āœ“ ResearchBot learned: LangGraph is a framework for building stateful...
āœ“ ResearchBot learned: CrewAI focuses on role-based agent collaboration...

šŸ’¬ Starting conversation...
šŸ¤– Agent: LangGraph is a framework designed for building stateful...

šŸ¤– Agent: LangGraph differs from CrewAI in several key ways...

šŸ“‹ Recent Conversation:
Messages: 6
🧠 Long-term Memory:
Total memories: 5
šŸ’” Memory Types Explained
  • Short-term (Buffer): Last N messages in conversation
  • Summary: Condensed version of all interactions (saves tokens)
  • Long-term (Vector): Semantic search over all past knowledge
  • Persistence: Long-term survives restarts, short-term doesn't

🌟 Bonus Challenge

Implement "importance scoring" for memories. Only store memories that exceed a certain importance threshold (as judged by the LLM). Add a forget_old_memories() method that removes low-importance memories older than 30 days.

Deliverable

āœ… An agent memory system with:

  • Short-term conversation buffer
  • Long-term vector storage
  • Automatic memory summarization
  • Semantic memory retrieval
  • Persistence across sessions
EXERCISE 5 ā±ļø 30 minutes

Deploy Production Agent API

Build a production-ready FastAPI service that exposes your agents as RESTful endpoints with streaming responses, error handling, and monitoring.

Objectives

Step 1: Install FastAPI Dependencies

pip install fastapi uvicorn pydantic

Step 2: Create Production API

Create exercise5_agent_api.py:

"""
Exercise 5: Production Agent API
FastAPI service for deploying agents with streaming, error handling, and monitoring
"""

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import asyncio
import logging
from datetime import datetime
import uuid

from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool
from langchain_community.utilities import SerpAPIWrapper
from langchain import hub

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI
app = FastAPI(
    title="Agent API",
    description="Production-ready API for autonomous AI agents",
    version="1.0.0"
)

# Request/Response Models
class AgentRequest(BaseModel):
    query: str = Field(..., description="The user's query for the agent")
    stream: bool = Field(False, description="Enable streaming responses")
    max_iterations: int = Field(5, description="Maximum agent iterations")

class AgentResponse(BaseModel):
    request_id: str
    query: str
    output: str
    iterations: int
    tools_used: List[str]
    timestamp: str

class HealthResponse(BaseModel):
    status: str
    timestamp: str
    uptime_seconds: float

# Global state
start_time = datetime.now()
request_count = 0
agent_cache = {}

# Agent initialization
def get_agent_executor():
    """Initialize or retrieve cached agent"""
    global agent_cache

    if "main_agent" in agent_cache:
        return agent_cache["main_agent"]

    logger.info("Initializing agent executor...")

    # LLM
    llm = ChatOpenAI(model="gpt-4", temperature=0, verbose=True)

    # Tools
    search = SerpAPIWrapper()
    tools = [
        Tool(
            name="Search",
            func=search.run,
            description="Search the internet for information"
        ),
        Tool(
            name="Calculator",
            func=lambda x: str(eval(x, {"__builtins__": {}}, {})),
            description="Perform calculations"
        )
    ]

    # Agent
    prompt = hub.pull("hwchase17/react")
    agent = create_react_agent(llm, tools, prompt)
    executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        handle_parsing_errors=True,
        max_iterations=5
    )

    agent_cache["main_agent"] = executor
    logger.info("Agent initialized successfully")
    return executor

# Endpoints

@app.get("/", tags=["Root"])
async def root():
    """Root endpoint"""
    return {
        "message": "Agent API is running",
        "docs": "/docs",
        "health": "/health"
    }

@app.get("/health", response_model=HealthResponse, tags=["Monitoring"])
async def health_check():
    """Health check endpoint"""
    uptime = (datetime.now() - start_time).total_seconds()
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "uptime_seconds": uptime
    }

@app.post("/agent/query", response_model=AgentResponse, tags=["Agent"])
async def query_agent(request: AgentRequest, background_tasks: BackgroundTasks):
    """
    Execute agent query (non-streaming)

    This endpoint runs the agent and returns the complete response.
    """
    global request_count
    request_count += 1

    request_id = str(uuid.uuid4())
    logger.info(f"[{request_id}] Received query: {request.query}")

    try:
        # Get agent
        agent = get_agent_executor()
        agent.max_iterations = request.max_iterations

        # Execute
        result = agent.invoke({"input": request.query})

        # Extract metadata
        iterations = len(result.get("intermediate_steps", []))
        tools_used = [
            step[0].tool for step in result.get("intermediate_steps", [])
        ]

        logger.info(f"[{request_id}] Query completed: {iterations} iterations, {len(tools_used)} tool calls")

        return AgentResponse(
            request_id=request_id,
            query=request.query,
            output=result["output"],
            iterations=iterations,
            tools_used=tools_used,
            timestamp=datetime.now().isoformat()
        )

    except Exception as e:
        logger.error(f"[{request_id}] Error: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

async def stream_agent_response(query: str, max_iterations: int):
    """Generate streaming agent response"""
    agent = get_agent_executor()
    agent.max_iterations = max_iterations

    try:
        # Simulate streaming (in production, use actual streaming callbacks)
        yield f"data: {{\"status\": \"started\", \"query\": \"{query}\"}}\n\n"
        await asyncio.sleep(0.1)

        # Execute agent
        result = agent.invoke({"input": query})

        # Stream intermediate steps
        for i, step in enumerate(result.get("intermediate_steps", [])):
            action, observation = step
            yield f"data: {{\"type\": \"step\", \"number\": {i+1}, \"tool\": \"{action.tool}\", \"input\": \"{action.tool_input[:100]}...\"}}\n\n"
            await asyncio.sleep(0.1)

        # Stream final output
        output_lines = result["output"].split("\n")
        for line in output_lines:
            yield f"data: {{\"type\": \"output\", \"content\": \"{line}\"}}\n\n"
            await asyncio.sleep(0.05)

        yield f"data: {{\"status\": \"completed\"}}\n\n"

    except Exception as e:
        logger.error(f"Streaming error: {str(e)}")
        yield f"data: {{\"status\": \"error\", \"message\": \"{str(e)}\"}}\n\n"

@app.post("/agent/stream", tags=["Agent"])
async def query_agent_stream(request: AgentRequest):
    """
    Execute agent query with streaming response

    This endpoint returns Server-Sent Events (SSE) for real-time updates.
    """
    logger.info(f"Streaming query: {request.query}")

    return StreamingResponse(
        stream_agent_response(request.query, request.max_iterations),
        media_type="text/event-stream"
    )

@app.get("/metrics", tags=["Monitoring"])
async def metrics():
    """Get API metrics"""
    uptime = (datetime.now() - start_time).total_seconds()

    return {
        "total_requests": request_count,
        "uptime_seconds": uptime,
        "agent_cached": "main_agent" in agent_cache,
        "timestamp": datetime.now().isoformat()
    }

# Startup/Shutdown Events

@app.on_event("startup")
async def startup_event():
    """Initialize on startup"""
    logger.info("Starting Agent API...")
    logger.info("Preloading agent executor...")
    get_agent_executor()
    logger.info("Agent API ready!")

@app.on_event("shutdown")
async def shutdown_event():
    """Cleanup on shutdown"""
    logger.info("Shutting down Agent API...")

# Run with: uvicorn exercise5_agent_api:app --reload --port 8000
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Step 3: Launch the API

uvicorn exercise5_agent_api:app --reload --port 8000

Step 4: Test the API

Option 1: Swagger UI (Recommended)

Open your browser: http://localhost:8000/docs

Option 2: curl Commands

# Health check
curl http://localhost:8000/health

# Non-streaming query
curl -X POST http://localhost:8000/agent/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": "What is 15 * 73?",
    "max_iterations": 5
  }'

# Streaming query
curl -N -X POST http://localhost:8000/agent/stream \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Search for the population of Paris and calculate 10% of that number",
    "stream": true
  }'

# Metrics
curl http://localhost:8000/metrics

Option 3: Python Client

# test_agent_api.py
import requests
import json

# Test non-streaming
response = requests.post(
    "http://localhost:8000/agent/query",
    json={"query": "What is the capital of France?"}
)
print(json.dumps(response.json(), indent=2))

# Test streaming
response = requests.post(
    "http://localhost:8000/agent/stream",
    json={"query": "Calculate 123 * 456"},
    stream=True
)

for line in response.iter_lines():
    if line:
        print(line.decode('utf-8'))
Expected API Response:
{
  "request_id": "a3f2c891-...",
  "query": "What is 15 * 73?",
  "output": "The result of 15 * 73 is 1095.",
  "iterations": 2,
  "tools_used": ["Calculator"],
  "timestamp": "2024-01-15T10:30:45..."
}
šŸ’” Production Best Practices
  • Authentication: Add API keys or OAuth
  • Rate Limiting: Prevent abuse with throttling
  • Caching: Cache agent responses for common queries
  • Monitoring: Track latency, errors, costs
  • Queueing: Handle high load with task queues
  • Timeouts: Set maximum execution time
āš ļø Security Considerations
  • Never expose production API keys in code
  • Implement request validation and sanitization
  • Add authentication before deploying publicly
  • Set resource limits to prevent abuse
  • Log all requests for audit trails

🌟 Bonus Challenge

Deploy your API to a cloud platform (AWS Lambda, Google Cloud Run, or Railway). Add authentication using JWT tokens. Implement a simple React frontend that calls your API and displays streaming responses in real-time.

Deliverable

āœ… A production-ready API with:

  • RESTful endpoints for agent queries
  • Streaming response support (SSE)
  • Health check and metrics endpoints
  • Comprehensive error handling
  • Request logging and monitoring
  • Auto-generated API documentation (Swagger)

šŸŽ‰ Lab Complete!

Congratulations on building a complete multi-agent system!

What You Built:

  • āœ“ ReAct agent with tool use
  • āœ“ LangGraph state machine with workflows
  • āœ“ Multi-agent collaboration system (CrewAI)
  • āœ“ Agent memory system (short & long-term)
  • āœ“ Production FastAPI service

Next: Take the Module 3 Quiz to test your knowledge!

Take the Quiz →

Estimated Costs

Total Lab Cost: $5-10 (GPT-4 API)

  • Exercise 1 (ReAct): ~$0.50
  • Exercise 2 (LangGraph): ~$1-2
  • Exercise 3 (Multi-Agent): ~$2-4
  • Exercise 4 (Memory): ~$1-2
  • Exercise 5 (API): ~$0.50-1

šŸ’” Cost Saving Tip: Use GPT-3.5-turbo for testing (10x cheaper), then switch to GPT-4 for final runs.

Troubleshooting

Common Issues

1. API Key Errors

Error: Invalid API key

Solution: Verify .env file has correct keys, run source .env

2. Import Errors

ModuleNotFoundError: No module named 'langchain'

Solution: Activate virtual environment, reinstall packages

3. Agent Loops Forever

Agent exceeded maximum iterations

Solution: Reduce max_iterations, simplify query, check tool descriptions

4. Memory Persistence Issues

ChromaDB collection not found

Solution: Check persist_directory path, verify write permissions

5. API Won't Start

Address already in use

Solution: Kill process on port 8000 or use different port

Going Further

  • Advanced Patterns: Implement hierarchical agent teams with AutoGPT
  • Custom Tools: Build domain-specific tools (database queries, API calls)
  • Human-in-the-Loop: Add approval steps for critical agent actions
  • Evaluation: Create test suites to measure agent performance
  • Cost Optimization: Implement caching, prompt compression, model routing