MODULE 3 - CHAPTER 3 ⏱️ 42 min read πŸ“– 3,500 words

Multi-Agent Communication

Coordination patterns, message passing, and shared memory for collaborative AI teams

While single AI agents represent a significant leap forward, the true power of agentic systems emerges when multiple agents work together. However, coordination is the challengeβ€”agents must communicate effectively, share knowledge, and avoid conflicts to achieve collective goals.

This chapter explores the fundamental communication patterns, state management strategies, and coordination techniques that enable multi-agent systems to function as cohesive, intelligent teams rather than isolated actors.

What You'll Learn

  • Why multi-agent systems outperform single agents for complex tasks
  • Direct vs indirect communication patterns
  • Implementing message passing protocols
  • Shared memory and scratchpad architectures
  • Conflict resolution and consensus mechanisms
  • Production-ready coordination patterns

1. Why Multi-Agent Systems?

Limitations of Single Agents

Despite their capabilities, single AI agents face inherent bottlenecks when confronted with highly complex, dynamic, or large-scale problems. Just as in human organizations, complex challenges often require the combined intelligence and specialized skills of a team.

Single Agent Bottlenecks
  • Scalability Challenges: A single agent becomes overwhelmed with too many tasks or increasing complexity
  • Limited Adaptability: If one agent fails or encounters an unexpected scenario, the entire system breaks down
  • Context Constraints: Even with large context windows, single agents can lose track of details in very long conversations
  • Cognitive Overload: For problems requiring diverse expertise (research, analysis, writing, coding), one agent may produce suboptimal solutions

Benefits of Multi-Agent Collaboration

Parallel Processing

Multiple agents can work on different subtasks simultaneously, dramatically reducing overall completion time.

Example: One agent researches while another analyzes data and a third drafts the report.

Specialization

Each agent can be optimized for a specific domain, leading to higher quality outputs within their expertise area.

Example: A "Coder" agent excels at Python, while a "Tester" agent is expert at finding bugs.

Robustness

The failure of one agent doesn't cripple the entire system. Other agents can continue or take over.

Example: If the web search agent fails, the system can still use cached data or alternative sources.

Diverse Perspectives

By simulating different roles or viewpoints, multi-agent systems can explore a wider range of solutions.

Example: A "Critic" agent challenges the "Proposer" agent's ideas, leading to better outcomes.

Example 1: Single Agent vs Multi-Agent Performance

import time
from typing import List

class PerformanceComparison:
    """Compare single agent vs multi-agent performance."""

    def single_agent_approach(self, tasks: List[str]) -> float:
        """Simulate single agent handling all tasks sequentially."""
        start = time.time()

        for task in tasks:
            # Simulate task execution (2 seconds per task)
            time.sleep(2)
            print(f"Single agent completed: {task}")

        duration = time.time() - start
        print(f"\nβœ“ Single agent total time: {duration:.1f}s")
        return duration

    def multi_agent_approach(self, tasks: List[str]) -> float:
        """Simulate multiple agents handling tasks in parallel."""
        import concurrent.futures

        start = time.time()

        def execute_task(task):
            time.sleep(2)  # Simulate task execution
            return f"Agent completed: {task}"

        # Execute tasks in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            results = list(executor.map(execute_task, tasks))

        for result in results:
            print(result)

        duration = time.time() - start
        print(f"\nβœ“ Multi-agent total time: {duration:.1f}s")
        return duration

# Usage
comparison = PerformanceComparison()

tasks = [
    "Research topic",
    "Analyze data",
    "Write report",
    "Review content",
    "Create visualizations"
]

print("="*60)
print("SINGLE AGENT APPROACH:")
print("="*60)
single_time = comparison.single_agent_approach(tasks)

print("\n" + "="*60)
print("MULTI-AGENT APPROACH:")
print("="*60)
multi_time = comparison.multi_agent_approach(tasks)

print("\n" + "="*60)
print(f"Speed improvement: {single_time / multi_time:.1f}x faster")
print("="*60)
Result: Multi-agent approach is typically 3-5x faster for parallelizable tasks, with the speedup increasing proportionally to the number of independent subtasks.

2. Communication Paradigms

Agents can communicate in two primary ways: directly (message passing) or indirectly (shared memory). Each approach has distinct advantages and trade-offs.

Pattern 1: Direct Communication (Message Passing)

In message passing, agents explicitly send messages to one another, similar to how humans send emails or chat messages. This provides clear intent and direct addressing.

Message Passing Architecture

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚          β”‚  ──── Message ───▢  β”‚          β”‚
    β”‚ Agent A  β”‚                     β”‚ Agent B  β”‚
    β”‚          β”‚  ◀─── Response ──   β”‚          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                                β”‚
         β”‚                                β”‚
         └───── Message ───▢ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
                             β”‚          β”‚
                             β”‚ Agent C  β”‚
                             β”‚          β”‚
                             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    

Example 2: Message Passing System

from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class Message:
    """Represents a message between agents."""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    sender: str = ""
    recipient: str = ""
    content: str = ""
    timestamp: datetime = field(default_factory=datetime.now)
    message_type: str = "text"  # text, request, response, error
    metadata: Dict = field(default_factory=dict)

class MessageBus:
    """Central message broker for agent communication."""

    def __init__(self):
        self.messages: List[Message] = []
        self.agents: Dict[str, 'Agent'] = {}

    def register_agent(self, agent: 'Agent'):
        """Register an agent with the message bus."""
        self.agents[agent.name] = agent
        print(f"βœ“ Registered agent: {agent.name}")

    def send_message(self, message: Message):
        """Route message from sender to recipient."""
        self.messages.append(message)

        if message.recipient in self.agents:
            # Deliver message to recipient
            recipient_agent = self.agents[message.recipient]
            recipient_agent.receive_message(message)
            print(f"πŸ“¨ {message.sender} β†’ {message.recipient}: {message.content[:50]}...")
        elif message.recipient == "broadcast":
            # Broadcast to all agents except sender
            for agent_name, agent in self.agents.items():
                if agent_name != message.sender:
                    agent.receive_message(message)
            print(f"πŸ“’ {message.sender} β†’ ALL: {message.content[:50]}...")
        else:
            print(f"❌ Recipient not found: {message.recipient}")

    def get_conversation(self, agent1: str, agent2: str) -> List[Message]:
        """Retrieve conversation history between two agents."""
        return [
            msg for msg in self.messages
            if (msg.sender == agent1 and msg.recipient == agent2) or
               (msg.sender == agent2 and msg.recipient == agent1)
        ]

class Agent:
    """Base agent class with messaging capabilities."""

    def __init__(self, name: str, role: str, message_bus: MessageBus):
        self.name = name
        self.role = role
        self.message_bus = message_bus
        self.inbox: List[Message] = []

        # Register with message bus
        message_bus.register_agent(self)

    def send_message(self, recipient: str, content: str, message_type: str = "text"):
        """Send a message to another agent."""
        message = Message(
            sender=self.name,
            recipient=recipient,
            content=content,
            message_type=message_type
        )
        self.message_bus.send_message(message)

    def receive_message(self, message: Message):
        """Receive and process a message."""
        self.inbox.append(message)
        self.process_message(message)

    def process_message(self, message: Message):
        """Override this method to define how the agent processes messages."""
        print(f"  β†’ {self.name} received: {message.content[:50]}...")

    def broadcast(self, content: str):
        """Broadcast message to all agents."""
        self.send_message("broadcast", content, "broadcast")

# Specialized agents
class ResearcherAgent(Agent):
    """Agent specialized in research."""

    def process_message(self, message: Message):
        """Process incoming research requests."""
        if "research" in message.content.lower():
            # Simulate research
            findings = f"Research findings on: {message.content}"
            self.send_message(message.sender, findings, "response")

class AnalystAgent(Agent):
    """Agent specialized in analysis."""

    def process_message(self, message: Message):
        """Process incoming analysis requests."""
        if "analyze" in message.content.lower():
            # Simulate analysis
            analysis = f"Analysis results: {message.content}"
            self.send_message(message.sender, analysis, "response")

class CoordinatorAgent(Agent):
    """Agent that coordinates other agents."""

    def process_message(self, message: Message):
        """Coordinate tasks across agents."""
        if message.message_type == "response":
            print(f"  β†’ Coordinator received result from {message.sender}")
            # Could delegate to another agent or compile results

# Usage
print("="*60)
print("MULTI-AGENT MESSAGE PASSING DEMO")
print("="*60 + "\n")

# Create message bus
bus = MessageBus()

# Create agents
coordinator = CoordinatorAgent("Coordinator", "Project Manager", bus)
researcher = ResearcherAgent("Researcher", "Research Specialist", bus)
analyst = AnalystAgent("Analyst", "Data Analyst", bus)

# Coordinator delegates tasks
print("\n--- Task Delegation ---")
coordinator.send_message("Researcher", "Please research AI agent frameworks")
coordinator.send_message("Analyst", "Please analyze the research findings")

# Agents can communicate directly
print("\n--- Direct Communication ---")
researcher.send_message("Analyst", "Here are my research findings on LangGraph")

# Broadcast to all agents
print("\n--- Broadcast ---")
coordinator.broadcast("Team meeting in 5 minutes!")

# View conversation history
print("\n--- Conversation History ---")
conversation = bus.get_conversation("Coordinator", "Researcher")
print(f"Messages between Coordinator and Researcher: {len(conversation)}")

Pattern 2: Indirect Communication (Shared Memory)

In shared memory architectures, agents interact by reading from and writing to a common memory spaceβ€”like a whiteboard or shared document. This decouples agents and simplifies communication patterns.

Shared Memory Architecture

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Agent A  β”‚        β”‚ Agent B  β”‚        β”‚ Agent C  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚ write             β”‚ read              β”‚ write
         β”‚                   β”‚                   β”‚
         β–Ό                   β–Ό                   β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚         SHARED MEMORY (Scratchpad)             β”‚
    β”‚  β€’ Current goal                                β”‚
    β”‚  β€’ Intermediate results                        β”‚
    β”‚  β€’ Agent observations                          β”‚
    β”‚  β€’ Shared context                              β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    

Example 3: Shared Scratchpad System

from typing import Any, Dict, List, Optional
from datetime import datetime
import threading
import json

class SharedScratchpad:
    """Thread-safe shared memory for agent collaboration."""

    def __init__(self):
        self.data: Dict[str, Any] = {}
        self.history: List[Dict] = []
        self.lock = threading.Lock()

    def write(self, key: str, value: Any, agent_name: str):
        """Write data to scratchpad with locking."""
        with self.lock:
            # Record history
            self.history.append({
                "timestamp": datetime.now().isoformat(),
                "agent": agent_name,
                "action": "write",
                "key": key,
                "value": value
            })

            # Update data
            self.data[key] = {
                "value": value,
                "updated_by": agent_name,
                "updated_at": datetime.now().isoformat()
            }

            print(f"✏️  {agent_name} wrote to '{key}': {str(value)[:50]}...")

    def read(self, key: str, agent_name: str) -> Optional[Any]:
        """Read data from scratchpad."""
        with self.lock:
            # Record history
            self.history.append({
                "timestamp": datetime.now().isoformat(),
                "agent": agent_name,
                "action": "read",
                "key": key
            })

            if key in self.data:
                value = self.data[key]["value"]
                print(f"πŸ“– {agent_name} read from '{key}': {str(value)[:50]}...")
                return value
            else:
                print(f"❌ {agent_name} tried to read missing key: '{key}'")
                return None

    def read_all(self, agent_name: str) -> Dict[str, Any]:
        """Read all data from scratchpad."""
        with self.lock:
            return {k: v["value"] for k, v in self.data.items()}

    def get_history(self) -> List[Dict]:
        """Get full history of operations."""
        return self.history

    def export_state(self) -> str:
        """Export current state as JSON."""
        return json.dumps(self.data, indent=2)

class ScratchpadAgent:
    """Agent that uses shared scratchpad for communication."""

    def __init__(self, name: str, role: str, scratchpad: SharedScratchpad):
        self.name = name
        self.role = role
        self.scratchpad = scratchpad

    def write_result(self, key: str, value: Any):
        """Write result to shared scratchpad."""
        self.scratchpad.write(key, value, self.name)

    def read_result(self, key: str) -> Optional[Any]:
        """Read result from shared scratchpad."""
        return self.scratchpad.read(key, self.name)

    def get_all_context(self) -> Dict[str, Any]:
        """Get all available context."""
        return self.scratchpad.read_all(self.name)

# Specialized scratchpad agents
class DataCollectorAgent(ScratchpadAgent):
    """Collects data and writes to scratchpad."""

    def collect_data(self):
        """Simulate data collection."""
        print(f"\n{self.name} collecting data...")

        # Simulate collecting different types of data
        self.write_result("user_count", 1250)
        self.write_result("revenue", 450000)
        self.write_result("active_projects", ["Project A", "Project B", "Project C"])

class AnalysisAgent(ScratchpadAgent):
    """Reads data and performs analysis."""

    def analyze(self):
        """Analyze data from scratchpad."""
        print(f"\n{self.name} performing analysis...")

        # Read data from scratchpad
        user_count = self.read_result("user_count")
        revenue = self.read_result("revenue")

        if user_count and revenue:
            # Perform calculation
            revenue_per_user = revenue / user_count
            self.write_result("revenue_per_user", revenue_per_user)
            print(f"  β†’ Calculated revenue per user: ${revenue_per_user:.2f}")

class ReportAgent(ScratchpadAgent):
    """Generates reports from scratchpad data."""

    def generate_report(self):
        """Generate report from all scratchpad data."""
        print(f"\n{self.name} generating report...")

        # Read all data
        context = self.get_all_context()

        # Generate report
        report = "="*60 + "\n"
        report += "ANALYTICS REPORT\n"
        report += "="*60 + "\n\n"

        for key, value in context.items():
            report += f"{key.replace('_', ' ').title()}: {value}\n"

        self.write_result("final_report", report)
        print(report)

# Usage
print("="*60)
print("MULTI-AGENT SHARED SCRATCHPAD DEMO")
print("="*60)

# Create shared scratchpad
scratchpad = SharedScratchpad()

# Create agents
collector = DataCollectorAgent("DataCollector", "Data Specialist", scratchpad)
analyst = AnalysisAgent("Analyst", "Data Analyst", scratchpad)
reporter = ReportAgent("Reporter", "Report Generator", scratchpad)

# Agents work in sequence, reading/writing to shared memory
collector.collect_data()
analyst.analyze()
reporter.generate_report()

# View operation history
print("\n" + "="*60)
print("OPERATION HISTORY")
print("="*60)
history = scratchpad.get_history()
for i, op in enumerate(history, 1):
    print(f"{i}. {op['timestamp'][:19]} | {op['agent']:15} | {op['action']:5} | {op['key']}")

Comparison: Message Passing vs Shared Memory

Aspect Message Passing Shared Memory
Coupling Tight (agents must know recipients) Loose (agents read/write freely)
Scalability Complex with many agents Scales well
Debugging Easy to trace messages Requires history tracking
Conflict Risk Low (explicit recipients) High (simultaneous writes)
Best For Direct collaboration, workflows Large teams, loose coordination

3. State Management in Multi-Agent Systems

State management is the process of tracking and updating the overall condition and progress of the multi-agent system. It ensures continuity and coherence across long-running tasks.

Persistence Architecture

For production systems, state must be persisted to databases to survive restarts, enable auditing, and support long-running workflows.

Example 4: Persistent State Manager

import sqlite3
import json
from typing import Any, Dict, List, Optional
from datetime import datetime

class PersistentStateManager:
    """Database-backed state management for multi-agent systems."""

    def __init__(self, db_path: str = "agent_state.db"):
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        """Initialize database schema."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # State table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS state (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                updated_by TEXT NOT NULL,
                updated_at TIMESTAMP NOT NULL,
                version INTEGER NOT NULL DEFAULT 1
            )
        """)

        # History table (for audit trail)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS state_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                key TEXT NOT NULL,
                value TEXT NOT NULL,
                updated_by TEXT NOT NULL,
                updated_at TIMESTAMP NOT NULL,
                action TEXT NOT NULL
            )
        """)

        # Checkpoint table (for workflow snapshots)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS checkpoints (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                state_snapshot TEXT NOT NULL,
                created_at TIMESTAMP NOT NULL,
                description TEXT
            )
        """)

        conn.commit()
        conn.close()

    def set_state(self, key: str, value: Any, agent_name: str):
        """Set state value with versioning."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        value_json = json.dumps(value)
        timestamp = datetime.now().isoformat()

        # Check if key exists
        cursor.execute("SELECT version FROM state WHERE key = ?", (key,))
        result = cursor.fetchone()

        if result:
            # Update existing
            new_version = result[0] + 1
            cursor.execute("""
                UPDATE state
                SET value = ?, updated_by = ?, updated_at = ?, version = ?
                WHERE key = ?
            """, (value_json, agent_name, timestamp, new_version, key))
        else:
            # Insert new
            cursor.execute("""
                INSERT INTO state (key, value, updated_by, updated_at, version)
                VALUES (?, ?, ?, ?, 1)
            """, (key, value_json, agent_name, timestamp))

        # Add to history
        cursor.execute("""
            INSERT INTO state_history (key, value, updated_by, updated_at, action)
            VALUES (?, ?, ?, ?, 'set')
        """, (key, value_json, agent_name, timestamp))

        conn.commit()
        conn.close()

        print(f"βœ“ State saved: {key} = {str(value)[:50]}... (by {agent_name})")

    def get_state(self, key: str) -> Optional[Any]:
        """Get state value."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("SELECT value FROM state WHERE key = ?", (key,))
        result = cursor.fetchone()
        conn.close()

        if result:
            return json.loads(result[0])
        return None

    def get_all_state(self) -> Dict[str, Any]:
        """Get all state values."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("SELECT key, value FROM state")
        results = cursor.fetchall()
        conn.close()

        return {key: json.loads(value) for key, value in results}

    def create_checkpoint(self, name: str, description: str = ""):
        """Create a checkpoint of current state."""
        current_state = self.get_all_state()
        state_snapshot = json.dumps(current_state, indent=2)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            INSERT INTO checkpoints (name, state_snapshot, created_at, description)
            VALUES (?, ?, ?, ?)
        """, (name, state_snapshot, datetime.now().isoformat(), description))

        checkpoint_id = cursor.lastrowid
        conn.commit()
        conn.close()

        print(f"βœ“ Checkpoint created: {name} (ID: {checkpoint_id})")
        return checkpoint_id

    def restore_checkpoint(self, checkpoint_id: int):
        """Restore state from a checkpoint."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT state_snapshot FROM checkpoints WHERE id = ?
        """, (checkpoint_id,))

        result = cursor.fetchone()
        conn.close()

        if not result:
            print(f"❌ Checkpoint {checkpoint_id} not found")
            return False

        # Restore state
        state_snapshot = json.loads(result[0])

        for key, value in state_snapshot.items():
            self.set_state(key, value, "system_restore")

        print(f"βœ“ State restored from checkpoint {checkpoint_id}")
        return True

    def get_history(self, key: Optional[str] = None) -> List[Dict]:
        """Get state change history."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        if key:
            cursor.execute("""
                SELECT * FROM state_history WHERE key = ? ORDER BY updated_at DESC
            """, (key,))
        else:
            cursor.execute("""
                SELECT * FROM state_history ORDER BY updated_at DESC LIMIT 50
            """)

        results = cursor.fetchall()
        conn.close()

        return [
            {
                "id": row[0],
                "key": row[1],
                "value": json.loads(row[2]),
                "updated_by": row[3],
                "updated_at": row[4],
                "action": row[5]
            }
            for row in results
        ]

# Usage
print("="*60)
print("PERSISTENT STATE MANAGEMENT DEMO")
print("="*60 + "\n")

# Create state manager
state_mgr = PersistentStateManager("demo_state.db")

# Agents write to state
print("--- Agents Writing State ---")
state_mgr.set_state("task_status", "in_progress", "Agent1")
state_mgr.set_state("progress_percentage", 25, "Agent1")
state_mgr.set_state("errors_found", [], "Agent2")

# Create checkpoint
print("\n--- Creating Checkpoint ---")
checkpoint_id = state_mgr.create_checkpoint("after_initial_setup", "State after agents started")

# More updates
print("\n--- Continuing Work ---")
state_mgr.set_state("progress_percentage", 50, "Agent1")
state_mgr.set_state("errors_found", ["Error in line 42"], "Agent2")

# View current state
print("\n--- Current State ---")
current = state_mgr.get_all_state()
print(json.dumps(current, indent=2))

# View history
print("\n--- State History ---")
history = state_mgr.get_history()
for entry in history[:5]:
    print(f"{entry['updated_at'][:19]} | {entry['updated_by']:10} | {entry['key']:20} | {entry['action']}")

# Restore checkpoint
print("\n--- Restoring Checkpoint ---")
state_mgr.restore_checkpoint(checkpoint_id)

# View restored state
print("\n--- Restored State ---")
restored = state_mgr.get_all_state()
print(json.dumps(restored, indent=2))

4. Conflict Resolution and Consensus

When multiple agents can write to shared state, conflicts can arise. Production systems need robust strategies to handle this.

Conflict Resolution Strategies

Common Conflict Resolution Approaches
  • Atomic Operations: Updates are indivisibleβ€”either fully succeed or fully fail
  • Locking Mechanisms: Only one agent can write to a resource at a time
  • Version Control: Track all changes and allow rollbacks if conflicts occur
  • Consensus Protocols: Agents vote on decisions, requiring majority or unanimous agreement
  • Priority-Based: Certain agents have higher priority in case of conflicts

Example 5: Consensus Mechanism

from typing import List, Dict, Any
from collections import Counter

class ConsensusManager:
    """Manage consensus-based decision making among agents."""

    def __init__(self, required_threshold: float = 0.67):
        self.required_threshold = required_threshold  # 67% agreement needed
        self.proposals: Dict[str, List[Dict]] = {}

    def submit_vote(self, proposal_id: str, agent_name: str, vote: Any, reasoning: str = ""):
        """Agent submits a vote on a proposal."""
        if proposal_id not in self.proposals:
            self.proposals[proposal_id] = []

        self.proposals[proposal_id].append({
            "agent": agent_name,
            "vote": vote,
            "reasoning": reasoning
        })

        print(f"πŸ“‹ {agent_name} voted: {vote} (Proposal: {proposal_id})")

    def check_consensus(self, proposal_id: str) -> Dict[str, Any]:
        """Check if consensus has been reached."""
        if proposal_id not in self.proposals:
            return {"consensus_reached": False, "reason": "No votes yet"}

        votes = self.proposals[proposal_id]
        total_votes = len(votes)

        # Count votes
        vote_counts = Counter([v["vote"] for v in votes])
        most_common_vote, count = vote_counts.most_common(1)[0]

        # Calculate agreement percentage
        agreement_percentage = count / total_votes

        consensus_reached = agreement_percentage >= self.required_threshold

        return {
            "consensus_reached": consensus_reached,
            "agreed_value": most_common_vote if consensus_reached else None,
            "agreement_percentage": agreement_percentage,
            "total_votes": total_votes,
            "vote_breakdown": dict(vote_counts),
            "votes": votes
        }

    def finalize_decision(self, proposal_id: str) -> Optional[Any]:
        """Finalize decision if consensus reached."""
        result = self.check_consensus(proposal_id)

        if result["consensus_reached"]:
            print(f"\nβœ… CONSENSUS REACHED on '{proposal_id}'")
            print(f"   Decision: {result['agreed_value']}")
            print(f"   Agreement: {result['agreement_percentage']*100:.1f}%")
            return result["agreed_value"]
        else:
            print(f"\n❌ NO CONSENSUS on '{proposal_id}'")
            print(f"   Current agreement: {result['agreement_percentage']*100:.1f}%")
            print(f"   Required: {self.required_threshold*100:.1f}%")
            return None

# Usage
print("="*60)
print("CONSENSUS-BASED DECISION MAKING")
print("="*60 + "\n")

consensus_mgr = ConsensusManager(required_threshold=0.67)

# Proposal: Which framework to use?
proposal_id = "framework_choice"

print("--- Agents Voting ---")
consensus_mgr.submit_vote(proposal_id, "Agent1", "LangGraph", "Need explicit control flow")
consensus_mgr.submit_vote(proposal_id, "Agent2", "LangGraph", "Better for complex workflows")
consensus_mgr.submit_vote(proposal_id, "Agent3", "CrewAI", "Prefer role-based approach")
consensus_mgr.submit_vote(proposal_id, "Agent4", "LangGraph", "Strong state management")

# Check consensus
print("\n--- Checking Consensus ---")
decision = consensus_mgr.finalize_decision(proposal_id)

if decision:
    print(f"\n🎯 Team will use: {decision}")

5. Production Coordination Patterns

Pattern 1: Coordinator-Worker

A coordinator agent delegates tasks to worker agents and aggregates results.

Example 6: Coordinator-Worker Pattern

class WorkerAgent:
    """Worker agent that executes assigned tasks."""

    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty

    def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Execute assigned task."""
        print(f"  β†’ {self.name} ({self.specialty}) executing: {task['description']}")

        # Simulate work
        import time
        time.sleep(1)

        return {
            "worker": self.name,
            "task_id": task["id"],
            "result": f"Completed: {task['description']}",
            "status": "success"
        }

class CoordinatorAgent:
    """Coordinator that delegates to workers."""

    def __init__(self, name: str, workers: List[WorkerAgent]):
        self.name = name
        self.workers = workers
        self.task_queue = []
        self.results = []

    def assign_task(self, task: Dict[str, Any]):
        """Assign task to appropriate worker."""
        # Find worker with matching specialty
        for worker in self.workers:
            if worker.specialty == task.get("specialty"):
                result = worker.execute_task(task)
                self.results.append(result)
                return result

        # If no specialty match, assign to first available worker
        result = self.workers[0].execute_task(task)
        self.results.append(result)
        return result

    def orchestrate(self, tasks: List[Dict[str, Any]]):
        """Orchestrate all tasks."""
        print(f"\n{self.name} orchestrating {len(tasks)} tasks...")

        for task in tasks:
            self.assign_task(task)

        print(f"\nβœ“ All tasks completed. Results collected: {len(self.results)}")

# Usage
print("\n" + "="*60)
print("COORDINATOR-WORKER PATTERN")
print("="*60)

# Create workers
workers = [
    WorkerAgent("Worker1", "research"),
    WorkerAgent("Worker2", "coding"),
    WorkerAgent("Worker3", "testing")
]

# Create coordinator
coordinator = CoordinatorAgent("MainCoordinator", workers)

# Define tasks
tasks = [
    {"id": 1, "description": "Research AI frameworks", "specialty": "research"},
    {"id": 2, "description": "Implement agent system", "specialty": "coding"},
    {"id": 3, "description": "Test implementation", "specialty": "testing"}
]

# Orchestrate
coordinator.orchestrate(tasks)

Pattern 2: Pipeline

Agents form a pipeline where output of one becomes input to the next.

Example 7: Agent Pipeline

class PipelineStage:
    """Base class for pipeline stages."""

    def __init__(self, name: str):
        self.name = name

    def process(self, input_data: Any) -> Any:
        """Process data (override in subclasses)."""
        raise NotImplementedError

class DataCollectionStage(PipelineStage):
    """Stage 1: Collect data."""

    def process(self, input_data: Any) -> Dict[str, Any]:
        print(f"  Stage 1 ({self.name}): Collecting data...")
        return {
            "raw_data": ["data1", "data2", "data3"],
            "timestamp": "2025-01-01",
            "source": "API"
        }

class DataProcessingStage(PipelineStage):
    """Stage 2: Process data."""

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        print(f"  Stage 2 ({self.name}): Processing data...")
        raw_data = input_data["raw_data"]

        return {
            **input_data,
            "processed_data": [d.upper() for d in raw_data],
            "count": len(raw_data)
        }

class ReportGenerationStage(PipelineStage):
    """Stage 3: Generate report."""

    def process(self, input_data: Dict[str, Any]) -> str:
        print(f"  Stage 3 ({self.name}): Generating report...")

        report = f"""
        DATA REPORT
        ===========
        Source: {input_data['source']}
        Timestamp: {input_data['timestamp']}
        Data Count: {input_data['count']}
        Processed Data: {', '.join(input_data['processed_data'])}
        """
        return report

class AgentPipeline:
    """Execute agents in a pipeline."""

    def __init__(self, stages: List[PipelineStage]):
        self.stages = stages

    def execute(self, initial_input: Any = None) -> Any:
        """Execute pipeline."""
        print("="*60)
        print("PIPELINE EXECUTION")
        print("="*60)

        data = initial_input

        for stage in self.stages:
            data = stage.process(data)

        print("\nβœ“ Pipeline completed")
        return data

# Usage
pipeline = AgentPipeline([
    DataCollectionStage("DataCollector"),
    DataProcessingStage("DataProcessor"),
    ReportGenerationStage("ReportGenerator")
])

result = pipeline.execute()
print(f"\nFinal Output:\n{result}")

Key Takeaways

  • Multi-Agent Benefits: Parallel processing, specialization, robustness, and diverse perspectives
  • Communication Patterns: Choose message passing for explicit coordination or shared memory for loose coupling
  • State Management: Use persistent databases for production systems requiring audit trails and recovery
  • Conflict Resolution: Implement consensus mechanisms or locking to handle simultaneous writes
  • Coordination Patterns: Coordinator-worker and pipeline patterns are common in production
  • Best Practices: Always implement versioning, history tracking, and checkpointing for complex systems

Further Reading