Multi-Agent Orchestration
Requires: ARAL-ORCH profile (Layers 1-6) for team coordination and orchestration
This example demonstrates how to orchestrate multiple ARAL agents working together on complex tasks that require coordination, consensus, and delegated workflows.
Scenario: Customer Support Team
Section titled “Scenario: Customer Support Team”We’ll build a team of 3 specialized agents:
- Router Agent: Classifies incoming requests
- Technical Agent: Handles technical issues
- Billing Agent: Handles billing issues
Architecture
Section titled “Architecture”Complete Implementation
Section titled “Complete Implementation”import { EventEmitter } from 'events';
// ==========================================// AGENT DEFINITION// ==========================================interface AgentConfig { id: string; name: string; role: string; specialization: string; capabilities: string[];}
class OrchestrationAgent { private id: string; private name: string; private role: string; private specialization: string; private capabilities: string[]; private memory: Map<string, any> = new Map();
constructor(config: AgentConfig) { this.id = config.id; this.name = config.name; this.role = config.role; this.specialization = config.specialization; this.capabilities = config.capabilities; }
getId(): string { return this.id; } getName(): string { return this.name; } getRole(): string { return this.role; } getSpecialization(): string { return this.specialization; } getCapabilities(): string[] { return this.capabilities; }
async process(input: string): Promise<{ analysis: string; score: number }> { // Simulate agent processing return { analysis: `[${this.name}] Analyzed: ${input}`, score: Math.random() * 100 }; }
async storeMemory(key: string, value: any): Promise<void> { this.memory.set(key, { value, timestamp: Date.now() }); }
async recallMemory(key: string): Promise<any> { return this.memory.get(key)?.value; }}
// ==========================================// ORCHESTRATION STRATEGIES// ==========================================type WorkflowStrategy = 'sequential' | 'parallel' | 'consensus' | 'hierarchical';
interface WorkflowStep { agentId: string; task: string; priority: number;}
interface WorkflowPlan { id: string; steps: WorkflowStep[]; strategy: WorkflowStrategy; expectedDuration: number;}
// ==========================================// ORCHESTRATOR// ==========================================class TeamOrchestrator extends EventEmitter { private agents: Map<string, OrchestrationAgent> = new Map(); private activeWorkflows: Map<string, WorkflowPlan> = new Map(); private executionResults: Map<string, any> = new Map();
addAgent(agent: OrchestrationAgent): void { this.agents.set(agent.getId(), agent); this.emit('agent:added', agent.getId()); }
removeAgent(agentId: string): void { this.agents.delete(agentId); this.emit('agent:removed', agentId); }
getAgent(agentId: string): OrchestrationAgent | undefined { return this.agents.get(agentId); }
listAgents(): OrchestrationAgent[] { return Array.from(this.agents.values()); }
// SEQUENTIAL EXECUTION async executeSequential(plan: WorkflowPlan): Promise<any[]> { console.log(`📋 Executing sequential plan: ${plan.id}`); const results: any[] = [];
for (const step of plan.steps.sort((a, b) => a.priority - b.priority)) { const agent = this.agents.get(step.agentId); if (!agent) throw new Error(`Agent ${step.agentId} not found`);
console.log(` → ${agent.getName()}: ${step.task}`); const result = await agent.process(step.task); results.push(result);
// Store intermediate result for next agent await agent.storeMemory('last_result', result); }
return results; }
// PARALLEL EXECUTION async executeParallel(plan: WorkflowPlan): Promise<any[]> { console.log(`⚡ Executing parallel plan: ${plan.id}`);
const promises = plan.steps.map(step => { const agent = this.agents.get(step.agentId); if (!agent) throw new Error(`Agent ${step.agentId} not found`);
console.log(` → ${agent.getName()}: ${step.task}`); return agent.process(step.task); });
return Promise.all(promises); }
// CONSENSUS EXECUTION async executeConsensus(plan: WorkflowPlan, threshold: number = 0.7): Promise<any> { console.log(`🤝 Executing consensus plan: ${plan.id}`);
const results = await this.executeParallel(plan); const scores = results.map((r: any) => r.score); const averageScore = scores.reduce((a, b) => a + b, 0) / scores.length;
const consensus = { averageScore, agreementLevel: averageScore / 100, passed: (averageScore / 100) >= threshold, details: results };
console.log(` Consensus: ${(consensus.agreementLevel * 100).toFixed(1)}% agreement`); return consensus; }
// HIERARCHICAL EXECUTION async executeHierarchical(plan: WorkflowPlan, rootAgentId?: string): Promise<any> { console.log(`🏛️ Executing hierarchical plan: ${plan.id}`);
const rootAgent = rootAgentId ? this.agents.get(rootAgentId) : this.agents.get(plan.steps[0].agentId);
if (!rootAgent) throw new Error('Root agent not found');
// Root agent delegates to sub-agents const delegatedTasks = plan.steps.filter(s => s.agentId !== rootAgent.getId()); const subResults = await Promise.all( delegatedTasks.map(step => { const agent = this.agents.get(step.agentId); return agent?.process(step.task); }) );
// Root agent synthesizes results const synthesis = await rootAgent.process( `Synthesize: ${subResults.map((r: any) => r.analysis).join(' | ')}` );
return { delegated: subResults, synthesis }; }
// EXECUTE WITH STRATEGY async execute(plan: WorkflowPlan): Promise<any> { this.activeWorkflows.set(plan.id, plan); this.emit('workflow:started', plan.id);
try { let result: any;
switch (plan.strategy) { case 'sequential': result = await this.executeSequential(plan); break; case 'parallel': result = await this.executeParallel(plan); break; case 'consensus': result = await this.executeConsensus(plan); break; case 'hierarchical': result = await this.executeHierarchical(plan); break; default: throw new Error(`Unknown strategy: ${plan.strategy}`); }
this.executionResults.set(plan.id, result); this.emit('workflow:completed', plan.id); return result; } catch (error) { this.emit('workflow:failed', { id: plan.id, error }); throw error; } }
getResult(workflowId: string): any { return this.executionResults.get(workflowId); }
getActiveWorkflows(): WorkflowPlan[] { return Array.from(this.activeWorkflows.values()); }}
// ==========================================// USAGE EXAMPLE// ==========================================async function main() { // Create orchestrator const orchestrator = new TeamOrchestrator();
// Create agents const routerAgent = new OrchestrationAgent({ id: 'router', name: 'Router', role: 'Manager', specialization: 'Request classification', capabilities: ['classify', 'route', 'prioritize'] });
const techAgent = new OrchestrationAgent({ id: 'tech', name: 'Tech Expert', role: 'Engineer', specialization: 'Technical support', capabilities: ['diagnose', 'fix', 'document'] });
const billingAgent = new OrchestrationAgent({ id: 'billing', name: 'Billing Specialist', role: 'Finance', specialization: 'Billing support', capabilities: ['verify', 'refund', 'adjust'] });
// Add agents to orchestrator orchestrator.addAgent(routerAgent); orchestrator.addAgent(techAgent); orchestrator.addAgent(billingAgent);
console.log('👥 Team assembled:'); orchestrator.listAgents().forEach(agent => { console.log(` - ${agent.getName()} (${agent.getRole()})`); });
// ========================================== // EXAMPLE 1: SEQUENTIAL - Process request step by step // ========================================== console.log('\n========== SEQUENTIAL EXECUTION =========='); const sequentialPlan: WorkflowPlan = { id: 'seq-001', strategy: 'sequential', expectedDuration: 30000, steps: [ { agentId: 'router', task: 'Classify request', priority: 1 }, { agentId: 'tech', task: 'Diagnose issue', priority: 2 }, { agentId: 'billing', task: 'Offer refund', priority: 3 } ] };
const seqResult = await orchestrator.execute(sequentialPlan); console.log('Result:', seqResult);
// ========================================== // EXAMPLE 2: PARALLEL - Agents work simultaneously // ========================================== console.log('\n========== PARALLEL EXECUTION =========='); const parallelPlan: WorkflowPlan = { id: 'par-001', strategy: 'parallel', expectedDuration: 10000, steps: [ { agentId: 'tech', task: 'Investigate technical issue', priority: 1 }, { agentId: 'billing', task: 'Check customer account', priority: 1 } ] };
const parResult = await orchestrator.execute(parallelPlan); console.log('Result:', parResult);
// ========================================== // EXAMPLE 3: CONSENSUS - All agents agree // ========================================== console.log('\n========== CONSENSUS EXECUTION =========='); const consensusPlan: WorkflowPlan = { id: 'con-001', strategy: 'consensus', expectedDuration: 15000, steps: [ { agentId: 'tech', task: 'Assess severity', priority: 1 }, { agentId: 'billing', task: 'Evaluate cost', priority: 1 }, { agentId: 'router', task: 'Check policy', priority: 1 } ] };
const conResult = await orchestrator.execute(consensusPlan); console.log('Consensus Result:', conResult); console.log(`Agreement Level: ${(conResult.agreementLevel * 100).toFixed(1)}%`);
// ========================================== // EXAMPLE 4: HIERARCHICAL - Manager coordinates // ========================================== console.log('\n========== HIERARCHICAL EXECUTION =========='); const hierarchicalPlan: WorkflowPlan = { id: 'hier-001', strategy: 'hierarchical', expectedDuration: 20000, steps: [ { agentId: 'router', task: 'Coordinate resolution', priority: 1 }, { agentId: 'tech', task: 'Execute fix', priority: 2 }, { agentId: 'billing', task: 'Process refund', priority: 2 } ] };
const hierResult = await orchestrator.execute(hierarchicalPlan); console.log('Hierarchical Result:', hierResult);
// ========================================== // SUMMARY // ========================================== console.log('\n========== SUMMARY =========='); console.log(`Total workflows executed: ${orchestrator.getActiveWorkflows().length}`); console.log('Agents in team: ' + orchestrator.listAgents().map(a => a.getName()).join(', '));}
// Runmain().catch(console.error);from typing import List, Dict, Any, Optionalfrom enum import Enumimport asyncioimport time
# ==========================================# AGENT DEFINITION# ==========================================class WorkflowStrategy(Enum): SEQUENTIAL = "sequential" PARALLEL = "parallel" CONSENSUS = "consensus" HIERARCHICAL = "hierarchical"
class OrchestrationAgent: def __init__(self, agent_id: str, name: str, role: str, specialization: str, capabilities: List[str]): self.id = agent_id self.name = name self.role = role self.specialization = specialization self.capabilities = capabilities self.memory = {}
def get_id(self) -> str: return self.id
def get_name(self) -> str: return self.name
def get_role(self) -> str: return self.role
def get_specialization(self) -> str: return self.specialization
def get_capabilities(self) -> List[str]: return self.capabilities
async def process(self, input_text: str) -> Dict[str, Any]: # Simulate processing await asyncio.sleep(0.1) return { 'analysis': f"[{self.name}] Analyzed: {input_text}", 'score': time.time() % 100 }
async def store_memory(self, key: str, value: Any): self.memory[key] = { 'value': value, 'timestamp': time.time() }
async def recall_memory(self, key: str) -> Optional[Any]: if key in self.memory: return self.memory[key]['value'] return None
# ==========================================# WORKFLOW STRUCTURES# ==========================================class WorkflowStep: def __init__(self, agent_id: str, task: str, priority: int): self.agent_id = agent_id self.task = task self.priority = priority
class WorkflowPlan: def __init__(self, plan_id: str, steps: List[WorkflowStep], strategy: WorkflowStrategy, expected_duration: int): self.id = plan_id self.steps = steps self.strategy = strategy self.expected_duration = expected_duration
# ==========================================# ORCHESTRATOR# ==========================================class TeamOrchestrator: def __init__(self): self.agents: Dict[str, OrchestrationAgent] = {} self.active_workflows: Dict[str, WorkflowPlan] = {} self.execution_results: Dict[str, Any] = {}
def add_agent(self, agent: OrchestrationAgent): self.agents[agent.get_id()] = agent
def remove_agent(self, agent_id: str): if agent_id in self.agents: del self.agents[agent_id]
def get_agent(self, agent_id: str) -> Optional[OrchestrationAgent]: return self.agents.get(agent_id)
def list_agents(self) -> List[OrchestrationAgent]: return list(self.agents.values())
# SEQUENTIAL EXECUTION async def execute_sequential(self, plan: WorkflowPlan) -> List[Any]: print(f"📋 Executing sequential plan: {plan.id}") results = []
sorted_steps = sorted(plan.steps, key=lambda s: s.priority)
for step in sorted_steps: agent = self.agents.get(step.agent_id) if not agent: raise ValueError(f"Agent {step.agent_id} not found")
print(f" → {agent.get_name()}: {step.task}") result = await agent.process(step.task) results.append(result)
await agent.store_memory('last_result', result)
return results
# PARALLEL EXECUTION async def execute_parallel(self, plan: WorkflowPlan) -> List[Any]: print(f"⚡ Executing parallel plan: {plan.id}")
tasks = [] for step in plan.steps: agent = self.agents.get(step.agent_id) if not agent: raise ValueError(f"Agent {step.agent_id} not found")
print(f" → {agent.get_name()}: {step.task}") tasks.append(agent.process(step.task))
return await asyncio.gather(*tasks)
# CONSENSUS EXECUTION async def execute_consensus(self, plan: WorkflowPlan, threshold: float = 0.7) -> Dict[str, Any]: print(f"🤝 Executing consensus plan: {plan.id}")
results = await self.execute_parallel(plan) scores = [r['score'] for r in results] average_score = sum(scores) / len(scores) if scores else 0
agreement_level = average_score / 100
consensus = { 'average_score': average_score, 'agreement_level': agreement_level, 'passed': agreement_level >= threshold, 'details': results }
print(f" Consensus: {agreement_level * 100:.1f}% agreement") return consensus
# HIERARCHICAL EXECUTION async def execute_hierarchical(self, plan: WorkflowPlan, root_agent_id: Optional[str] = None) -> Dict[str, Any]: print(f"🏛️ Executing hierarchical plan: {plan.id}")
root_agent = None if root_agent_id: root_agent = self.agents.get(root_agent_id) else: root_agent = self.agents.get(plan.steps[0].agent_id)
if not root_agent: raise ValueError("Root agent not found")
# Root agent delegates delegated_tasks = [s for s in plan.steps if s.agent_id != root_agent.get_id()]
sub_results = [] for step in delegated_tasks: agent = self.agents.get(step.agent_id) if agent: result = await agent.process(step.task) sub_results.append(result)
# Root synthesizes synthesis = await root_agent.process( f"Synthesize: {' | '.join(r['analysis'] for r in sub_results)}" )
return { 'delegated': sub_results, 'synthesis': synthesis }
# EXECUTE WITH STRATEGY async def execute(self, plan: WorkflowPlan) -> Any: self.active_workflows[plan.id] = plan
try: if plan.strategy == WorkflowStrategy.SEQUENTIAL: result = await self.execute_sequential(plan) elif plan.strategy == WorkflowStrategy.PARALLEL: result = await self.execute_parallel(plan) elif plan.strategy == WorkflowStrategy.CONSENSUS: result = await self.execute_consensus(plan) elif plan.strategy == WorkflowStrategy.HIERARCHICAL: result = await self.execute_hierarchical(plan) else: raise ValueError(f"Unknown strategy: {plan.strategy}")
self.execution_results[plan.id] = result return result except Exception as error: print(f"❌ Workflow failed: {error}") raise
def get_result(self, workflow_id: str) -> Optional[Any]: return self.execution_results.get(workflow_id)
def get_active_workflows(self) -> List[WorkflowPlan]: return list(self.active_workflows.values())
# ==========================================# USAGE EXAMPLE# ==========================================async def main(): # Create orchestrator orchestrator = TeamOrchestrator()
# Create agents router_agent = OrchestrationAgent( 'router', 'Router', 'Manager', 'Request classification', ['classify', 'route', 'prioritize'] )
tech_agent = OrchestrationAgent( 'tech', 'Tech Expert', 'Engineer', 'Technical support', ['diagnose', 'fix', 'document'] )
billing_agent = OrchestrationAgent( 'billing', 'Billing Specialist', 'Finance', 'Billing support', ['verify', 'refund', 'adjust'] )
# Add agents orchestrator.add_agent(router_agent) orchestrator.add_agent(tech_agent) orchestrator.add_agent(billing_agent)
print('👥 Team assembled:') for agent in orchestrator.list_agents(): print(f" - {agent.get_name()} ({agent.get_role()})")
# ========== SEQUENTIAL ========== print('\n========== SEQUENTIAL EXECUTION ==========') seq_plan = WorkflowPlan( 'seq-001', [ WorkflowStep('router', 'Classify request', 1), WorkflowStep('tech', 'Diagnose issue', 2), WorkflowStep('billing', 'Offer refund', 3) ], WorkflowStrategy.SEQUENTIAL, 30000 ) seq_result = await orchestrator.execute(seq_plan) print('Result:', seq_result)
# ========== PARALLEL ========== print('\n========== PARALLEL EXECUTION ==========') par_plan = WorkflowPlan( 'par-001', [ WorkflowStep('tech', 'Investigate technical issue', 1), WorkflowStep('billing', 'Check customer account', 1) ], WorkflowStrategy.PARALLEL, 10000 ) par_result = await orchestrator.execute(par_plan) print('Result:', par_result)
# ========== CONSENSUS ========== print('\n========== CONSENSUS EXECUTION ==========') con_plan = WorkflowPlan( 'con-001', [ WorkflowStep('tech', 'Assess severity', 1), WorkflowStep('billing', 'Evaluate cost', 1), WorkflowStep('router', 'Check policy', 1) ], WorkflowStrategy.CONSENSUS, 15000 ) con_result = await orchestrator.execute(con_plan) print(f"Consensus Result: {con_result}") print(f"Agreement Level: {con_result['agreement_level'] * 100:.1f}%")
# ========== HIERARCHICAL ========== print('\n========== HIERARCHICAL EXECUTION ==========') hier_plan = WorkflowPlan( 'hier-001', [ WorkflowStep('router', 'Coordinate resolution', 1), WorkflowStep('tech', 'Execute fix', 2), WorkflowStep('billing', 'Process refund', 2) ], WorkflowStrategy.HIERARCHICAL, 20000 ) hier_result = await orchestrator.execute(hier_plan) print('Hierarchical Result:', hier_result)
# Summary print('\n========== SUMMARY ==========') print(f"Total workflows: {len(orchestrator.get_active_workflows())}") print(f"Agents: {', '.join(a.get_name() for a in orchestrator.list_agents())}")
# Runif __name__ == '__main__': asyncio.run(main())Expected Output
Section titled “Expected Output”👥 Team assembled: - Router (Manager) - Tech Expert (Engineer) - Billing Specialist (Finance)
========== SEQUENTIAL EXECUTION ==========📋 Executing sequential plan: seq-001 → Router: Classify request → Tech Expert: Diagnose issue → Billing Specialist: Offer refundResult: [ { analysis: '[Router] Analyzed: Classify request', score: 45.2 }, { analysis: '[Tech Expert] Analyzed: Diagnose issue', score: 67.8 }, { analysis: '[Billing Specialist] Analyzed: Offer refund', score: 82.1 }]
========== PARALLEL EXECUTION ==========⚡ Executing parallel plan: par-001 → Tech Expert: Investigate technical issue → Billing Specialist: Check customer accountResult: [ { analysis: '[Tech Expert] Analyzed: Investigate technical issue', score: 71.5 }, { analysis: '[Billing Specialist] Analyzed: Check customer account', score: 56.3 }]
========== CONSENSUS EXECUTION ==========🤝 Executing consensus plan: con-001 → Tech Expert: Assess severity → Billing Specialist: Evaluate cost → Router: Check policy Consensus: 65.2% agreementConsensus Result: { average_score: 65.2, agreement_level: 0.652, passed: false, details: [...]}
========== HIERARCHICAL EXECUTION ==========🏛️ Executing hierarchical plan: hier-001 → Router: Coordinate resolution → Tech Expert: Execute fix → Billing Specialist: Process refundHierarchical Result: { delegated: [...], synthesis: {...}}
========== SUMMARY ==========Total workflows: 4Agents: Router, Tech Expert, Billing SpecialistKey Concepts
Section titled “Key Concepts”Agents work one after another, each using results from the previous agent. Best for processes with dependencies.
Agents work simultaneously on independent tasks. Fastest for non-dependent work.
All agents analyze the same problem, results are aggregated. Best for critical decisions.
Manager agent coordinates specialists. Best for complex problems requiring oversight.
Use Cases
Section titled “Use Cases”Customer Support Escalation
Tier 1 agent tries to solve → Escalates to Tier 2 → Escalates to Manager for final decision
Code Review Process
Multiple specialist agents review code in parallel → Consensus on approval → Manager signs off
Risk Assessment
Security agent, Compliance agent, and Financial agent analyze risk independently → Vote on decision
Content Moderation
Multiple moderators review content in parallel → Consensus threshold triggers action