Skip to content

Multi-Agent Orchestration

Information

Requires: ARAL-ORCH profile (Layers 1-6) for team coordination and orchestration

This example demonstrates how to orchestrate multiple ARAL agents working together on complex tasks that require coordination, consensus, and delegated workflows.


We’ll build a team of 3 specialized agents:

  • Router Agent: Classifies incoming requests
  • Technical Agent: Handles technical issues
  • Billing Agent: Handles billing issues

Chat

Route

Route

Execute

Execute

Aggregate

Return

User Request

Router Agent

Persona: Manager

Technical Agent

Persona: Engineer

Billing Agent

Persona: Finance

Consensus Engine

Final Response


orchestration.ts
import { EventEmitter } from 'events';
// ==========================================
// AGENT DEFINITION
// ==========================================
interface AgentConfig {
id: string;
name: string;
role: string;
specialization: string;
capabilities: string[];
}
class OrchestrationAgent {
private id: string;
private name: string;
private role: string;
private specialization: string;
private capabilities: string[];
private memory: Map<string, any> = new Map();
constructor(config: AgentConfig) {
this.id = config.id;
this.name = config.name;
this.role = config.role;
this.specialization = config.specialization;
this.capabilities = config.capabilities;
}
getId(): string { return this.id; }
getName(): string { return this.name; }
getRole(): string { return this.role; }
getSpecialization(): string { return this.specialization; }
getCapabilities(): string[] { return this.capabilities; }
async process(input: string): Promise<{ analysis: string; score: number }> {
// Simulate agent processing
return {
analysis: `[${this.name}] Analyzed: ${input}`,
score: Math.random() * 100
};
}
async storeMemory(key: string, value: any): Promise<void> {
this.memory.set(key, { value, timestamp: Date.now() });
}
async recallMemory(key: string): Promise<any> {
return this.memory.get(key)?.value;
}
}
// ==========================================
// ORCHESTRATION STRATEGIES
// ==========================================
type WorkflowStrategy = 'sequential' | 'parallel' | 'consensus' | 'hierarchical';
interface WorkflowStep {
agentId: string;
task: string;
priority: number;
}
interface WorkflowPlan {
id: string;
steps: WorkflowStep[];
strategy: WorkflowStrategy;
expectedDuration: number;
}
// ==========================================
// ORCHESTRATOR
// ==========================================
class TeamOrchestrator extends EventEmitter {
private agents: Map<string, OrchestrationAgent> = new Map();
private activeWorkflows: Map<string, WorkflowPlan> = new Map();
private executionResults: Map<string, any> = new Map();
addAgent(agent: OrchestrationAgent): void {
this.agents.set(agent.getId(), agent);
this.emit('agent:added', agent.getId());
}
removeAgent(agentId: string): void {
this.agents.delete(agentId);
this.emit('agent:removed', agentId);
}
getAgent(agentId: string): OrchestrationAgent | undefined {
return this.agents.get(agentId);
}
listAgents(): OrchestrationAgent[] {
return Array.from(this.agents.values());
}
// SEQUENTIAL EXECUTION
async executeSequential(plan: WorkflowPlan): Promise<any[]> {
console.log(`📋 Executing sequential plan: ${plan.id}`);
const results: any[] = [];
for (const step of plan.steps.sort((a, b) => a.priority - b.priority)) {
const agent = this.agents.get(step.agentId);
if (!agent) throw new Error(`Agent ${step.agentId} not found`);
console.log(`${agent.getName()}: ${step.task}`);
const result = await agent.process(step.task);
results.push(result);
// Store intermediate result for next agent
await agent.storeMemory('last_result', result);
}
return results;
}
// PARALLEL EXECUTION
async executeParallel(plan: WorkflowPlan): Promise<any[]> {
console.log(`⚡ Executing parallel plan: ${plan.id}`);
const promises = plan.steps.map(step => {
const agent = this.agents.get(step.agentId);
if (!agent) throw new Error(`Agent ${step.agentId} not found`);
console.log(`${agent.getName()}: ${step.task}`);
return agent.process(step.task);
});
return Promise.all(promises);
}
// CONSENSUS EXECUTION
async executeConsensus(plan: WorkflowPlan, threshold: number = 0.7): Promise<any> {
console.log(`🤝 Executing consensus plan: ${plan.id}`);
const results = await this.executeParallel(plan);
const scores = results.map((r: any) => r.score);
const averageScore = scores.reduce((a, b) => a + b, 0) / scores.length;
const consensus = {
averageScore,
agreementLevel: averageScore / 100,
passed: (averageScore / 100) >= threshold,
details: results
};
console.log(` Consensus: ${(consensus.agreementLevel * 100).toFixed(1)}% agreement`);
return consensus;
}
// HIERARCHICAL EXECUTION
async executeHierarchical(plan: WorkflowPlan, rootAgentId?: string): Promise<any> {
console.log(`🏛️ Executing hierarchical plan: ${plan.id}`);
const rootAgent = rootAgentId
? this.agents.get(rootAgentId)
: this.agents.get(plan.steps[0].agentId);
if (!rootAgent) throw new Error('Root agent not found');
// Root agent delegates to sub-agents
const delegatedTasks = plan.steps.filter(s => s.agentId !== rootAgent.getId());
const subResults = await Promise.all(
delegatedTasks.map(step => {
const agent = this.agents.get(step.agentId);
return agent?.process(step.task);
})
);
// Root agent synthesizes results
const synthesis = await rootAgent.process(
`Synthesize: ${subResults.map((r: any) => r.analysis).join(' | ')}`
);
return {
delegated: subResults,
synthesis
};
}
// EXECUTE WITH STRATEGY
async execute(plan: WorkflowPlan): Promise<any> {
this.activeWorkflows.set(plan.id, plan);
this.emit('workflow:started', plan.id);
try {
let result: any;
switch (plan.strategy) {
case 'sequential':
result = await this.executeSequential(plan);
break;
case 'parallel':
result = await this.executeParallel(plan);
break;
case 'consensus':
result = await this.executeConsensus(plan);
break;
case 'hierarchical':
result = await this.executeHierarchical(plan);
break;
default:
throw new Error(`Unknown strategy: ${plan.strategy}`);
}
this.executionResults.set(plan.id, result);
this.emit('workflow:completed', plan.id);
return result;
} catch (error) {
this.emit('workflow:failed', { id: plan.id, error });
throw error;
}
}
getResult(workflowId: string): any {
return this.executionResults.get(workflowId);
}
getActiveWorkflows(): WorkflowPlan[] {
return Array.from(this.activeWorkflows.values());
}
}
// ==========================================
// USAGE EXAMPLE
// ==========================================
async function main() {
// Create orchestrator
const orchestrator = new TeamOrchestrator();
// Create agents
const routerAgent = new OrchestrationAgent({
id: 'router',
name: 'Router',
role: 'Manager',
specialization: 'Request classification',
capabilities: ['classify', 'route', 'prioritize']
});
const techAgent = new OrchestrationAgent({
id: 'tech',
name: 'Tech Expert',
role: 'Engineer',
specialization: 'Technical support',
capabilities: ['diagnose', 'fix', 'document']
});
const billingAgent = new OrchestrationAgent({
id: 'billing',
name: 'Billing Specialist',
role: 'Finance',
specialization: 'Billing support',
capabilities: ['verify', 'refund', 'adjust']
});
// Add agents to orchestrator
orchestrator.addAgent(routerAgent);
orchestrator.addAgent(techAgent);
orchestrator.addAgent(billingAgent);
console.log('👥 Team assembled:');
orchestrator.listAgents().forEach(agent => {
console.log(` - ${agent.getName()} (${agent.getRole()})`);
});
// ==========================================
// EXAMPLE 1: SEQUENTIAL - Process request step by step
// ==========================================
console.log('\n========== SEQUENTIAL EXECUTION ==========');
const sequentialPlan: WorkflowPlan = {
id: 'seq-001',
strategy: 'sequential',
expectedDuration: 30000,
steps: [
{ agentId: 'router', task: 'Classify request', priority: 1 },
{ agentId: 'tech', task: 'Diagnose issue', priority: 2 },
{ agentId: 'billing', task: 'Offer refund', priority: 3 }
]
};
const seqResult = await orchestrator.execute(sequentialPlan);
console.log('Result:', seqResult);
// ==========================================
// EXAMPLE 2: PARALLEL - Agents work simultaneously
// ==========================================
console.log('\n========== PARALLEL EXECUTION ==========');
const parallelPlan: WorkflowPlan = {
id: 'par-001',
strategy: 'parallel',
expectedDuration: 10000,
steps: [
{ agentId: 'tech', task: 'Investigate technical issue', priority: 1 },
{ agentId: 'billing', task: 'Check customer account', priority: 1 }
]
};
const parResult = await orchestrator.execute(parallelPlan);
console.log('Result:', parResult);
// ==========================================
// EXAMPLE 3: CONSENSUS - All agents agree
// ==========================================
console.log('\n========== CONSENSUS EXECUTION ==========');
const consensusPlan: WorkflowPlan = {
id: 'con-001',
strategy: 'consensus',
expectedDuration: 15000,
steps: [
{ agentId: 'tech', task: 'Assess severity', priority: 1 },
{ agentId: 'billing', task: 'Evaluate cost', priority: 1 },
{ agentId: 'router', task: 'Check policy', priority: 1 }
]
};
const conResult = await orchestrator.execute(consensusPlan);
console.log('Consensus Result:', conResult);
console.log(`Agreement Level: ${(conResult.agreementLevel * 100).toFixed(1)}%`);
// ==========================================
// EXAMPLE 4: HIERARCHICAL - Manager coordinates
// ==========================================
console.log('\n========== HIERARCHICAL EXECUTION ==========');
const hierarchicalPlan: WorkflowPlan = {
id: 'hier-001',
strategy: 'hierarchical',
expectedDuration: 20000,
steps: [
{ agentId: 'router', task: 'Coordinate resolution', priority: 1 },
{ agentId: 'tech', task: 'Execute fix', priority: 2 },
{ agentId: 'billing', task: 'Process refund', priority: 2 }
]
};
const hierResult = await orchestrator.execute(hierarchicalPlan);
console.log('Hierarchical Result:', hierResult);
// ==========================================
// SUMMARY
// ==========================================
console.log('\n========== SUMMARY ==========');
console.log(`Total workflows executed: ${orchestrator.getActiveWorkflows().length}`);
console.log('Agents in team: ' + orchestrator.listAgents().map(a => a.getName()).join(', '));
}
// Run
main().catch(console.error);
orchestration.py
from typing import List, Dict, Any, Optional
from enum import Enum
import asyncio
import time
# ==========================================
# AGENT DEFINITION
# ==========================================
class WorkflowStrategy(Enum):
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
CONSENSUS = "consensus"
HIERARCHICAL = "hierarchical"
class OrchestrationAgent:
def __init__(self, agent_id: str, name: str, role: str,
specialization: str, capabilities: List[str]):
self.id = agent_id
self.name = name
self.role = role
self.specialization = specialization
self.capabilities = capabilities
self.memory = {}
def get_id(self) -> str:
return self.id
def get_name(self) -> str:
return self.name
def get_role(self) -> str:
return self.role
def get_specialization(self) -> str:
return self.specialization
def get_capabilities(self) -> List[str]:
return self.capabilities
async def process(self, input_text: str) -> Dict[str, Any]:
# Simulate processing
await asyncio.sleep(0.1)
return {
'analysis': f"[{self.name}] Analyzed: {input_text}",
'score': time.time() % 100
}
async def store_memory(self, key: str, value: Any):
self.memory[key] = {
'value': value,
'timestamp': time.time()
}
async def recall_memory(self, key: str) -> Optional[Any]:
if key in self.memory:
return self.memory[key]['value']
return None
# ==========================================
# WORKFLOW STRUCTURES
# ==========================================
class WorkflowStep:
def __init__(self, agent_id: str, task: str, priority: int):
self.agent_id = agent_id
self.task = task
self.priority = priority
class WorkflowPlan:
def __init__(self, plan_id: str, steps: List[WorkflowStep],
strategy: WorkflowStrategy, expected_duration: int):
self.id = plan_id
self.steps = steps
self.strategy = strategy
self.expected_duration = expected_duration
# ==========================================
# ORCHESTRATOR
# ==========================================
class TeamOrchestrator:
def __init__(self):
self.agents: Dict[str, OrchestrationAgent] = {}
self.active_workflows: Dict[str, WorkflowPlan] = {}
self.execution_results: Dict[str, Any] = {}
def add_agent(self, agent: OrchestrationAgent):
self.agents[agent.get_id()] = agent
def remove_agent(self, agent_id: str):
if agent_id in self.agents:
del self.agents[agent_id]
def get_agent(self, agent_id: str) -> Optional[OrchestrationAgent]:
return self.agents.get(agent_id)
def list_agents(self) -> List[OrchestrationAgent]:
return list(self.agents.values())
# SEQUENTIAL EXECUTION
async def execute_sequential(self, plan: WorkflowPlan) -> List[Any]:
print(f"📋 Executing sequential plan: {plan.id}")
results = []
sorted_steps = sorted(plan.steps, key=lambda s: s.priority)
for step in sorted_steps:
agent = self.agents.get(step.agent_id)
if not agent:
raise ValueError(f"Agent {step.agent_id} not found")
print(f" → {agent.get_name()}: {step.task}")
result = await agent.process(step.task)
results.append(result)
await agent.store_memory('last_result', result)
return results
# PARALLEL EXECUTION
async def execute_parallel(self, plan: WorkflowPlan) -> List[Any]:
print(f"⚡ Executing parallel plan: {plan.id}")
tasks = []
for step in plan.steps:
agent = self.agents.get(step.agent_id)
if not agent:
raise ValueError(f"Agent {step.agent_id} not found")
print(f" → {agent.get_name()}: {step.task}")
tasks.append(agent.process(step.task))
return await asyncio.gather(*tasks)
# CONSENSUS EXECUTION
async def execute_consensus(self, plan: WorkflowPlan, threshold: float = 0.7) -> Dict[str, Any]:
print(f"🤝 Executing consensus plan: {plan.id}")
results = await self.execute_parallel(plan)
scores = [r['score'] for r in results]
average_score = sum(scores) / len(scores) if scores else 0
agreement_level = average_score / 100
consensus = {
'average_score': average_score,
'agreement_level': agreement_level,
'passed': agreement_level >= threshold,
'details': results
}
print(f" Consensus: {agreement_level * 100:.1f}% agreement")
return consensus
# HIERARCHICAL EXECUTION
async def execute_hierarchical(self, plan: WorkflowPlan,
root_agent_id: Optional[str] = None) -> Dict[str, Any]:
print(f"🏛️ Executing hierarchical plan: {plan.id}")
root_agent = None
if root_agent_id:
root_agent = self.agents.get(root_agent_id)
else:
root_agent = self.agents.get(plan.steps[0].agent_id)
if not root_agent:
raise ValueError("Root agent not found")
# Root agent delegates
delegated_tasks = [s for s in plan.steps if s.agent_id != root_agent.get_id()]
sub_results = []
for step in delegated_tasks:
agent = self.agents.get(step.agent_id)
if agent:
result = await agent.process(step.task)
sub_results.append(result)
# Root synthesizes
synthesis = await root_agent.process(
f"Synthesize: {' | '.join(r['analysis'] for r in sub_results)}"
)
return {
'delegated': sub_results,
'synthesis': synthesis
}
# EXECUTE WITH STRATEGY
async def execute(self, plan: WorkflowPlan) -> Any:
self.active_workflows[plan.id] = plan
try:
if plan.strategy == WorkflowStrategy.SEQUENTIAL:
result = await self.execute_sequential(plan)
elif plan.strategy == WorkflowStrategy.PARALLEL:
result = await self.execute_parallel(plan)
elif plan.strategy == WorkflowStrategy.CONSENSUS:
result = await self.execute_consensus(plan)
elif plan.strategy == WorkflowStrategy.HIERARCHICAL:
result = await self.execute_hierarchical(plan)
else:
raise ValueError(f"Unknown strategy: {plan.strategy}")
self.execution_results[plan.id] = result
return result
except Exception as error:
print(f"❌ Workflow failed: {error}")
raise
def get_result(self, workflow_id: str) -> Optional[Any]:
return self.execution_results.get(workflow_id)
def get_active_workflows(self) -> List[WorkflowPlan]:
return list(self.active_workflows.values())
# ==========================================
# USAGE EXAMPLE
# ==========================================
async def main():
# Create orchestrator
orchestrator = TeamOrchestrator()
# Create agents
router_agent = OrchestrationAgent(
'router', 'Router', 'Manager', 'Request classification',
['classify', 'route', 'prioritize']
)
tech_agent = OrchestrationAgent(
'tech', 'Tech Expert', 'Engineer', 'Technical support',
['diagnose', 'fix', 'document']
)
billing_agent = OrchestrationAgent(
'billing', 'Billing Specialist', 'Finance', 'Billing support',
['verify', 'refund', 'adjust']
)
# Add agents
orchestrator.add_agent(router_agent)
orchestrator.add_agent(tech_agent)
orchestrator.add_agent(billing_agent)
print('👥 Team assembled:')
for agent in orchestrator.list_agents():
print(f" - {agent.get_name()} ({agent.get_role()})")
# ========== SEQUENTIAL ==========
print('\n========== SEQUENTIAL EXECUTION ==========')
seq_plan = WorkflowPlan(
'seq-001',
[
WorkflowStep('router', 'Classify request', 1),
WorkflowStep('tech', 'Diagnose issue', 2),
WorkflowStep('billing', 'Offer refund', 3)
],
WorkflowStrategy.SEQUENTIAL,
30000
)
seq_result = await orchestrator.execute(seq_plan)
print('Result:', seq_result)
# ========== PARALLEL ==========
print('\n========== PARALLEL EXECUTION ==========')
par_plan = WorkflowPlan(
'par-001',
[
WorkflowStep('tech', 'Investigate technical issue', 1),
WorkflowStep('billing', 'Check customer account', 1)
],
WorkflowStrategy.PARALLEL,
10000
)
par_result = await orchestrator.execute(par_plan)
print('Result:', par_result)
# ========== CONSENSUS ==========
print('\n========== CONSENSUS EXECUTION ==========')
con_plan = WorkflowPlan(
'con-001',
[
WorkflowStep('tech', 'Assess severity', 1),
WorkflowStep('billing', 'Evaluate cost', 1),
WorkflowStep('router', 'Check policy', 1)
],
WorkflowStrategy.CONSENSUS,
15000
)
con_result = await orchestrator.execute(con_plan)
print(f"Consensus Result: {con_result}")
print(f"Agreement Level: {con_result['agreement_level'] * 100:.1f}%")
# ========== HIERARCHICAL ==========
print('\n========== HIERARCHICAL EXECUTION ==========')
hier_plan = WorkflowPlan(
'hier-001',
[
WorkflowStep('router', 'Coordinate resolution', 1),
WorkflowStep('tech', 'Execute fix', 2),
WorkflowStep('billing', 'Process refund', 2)
],
WorkflowStrategy.HIERARCHICAL,
20000
)
hier_result = await orchestrator.execute(hier_plan)
print('Hierarchical Result:', hier_result)
# Summary
print('\n========== SUMMARY ==========')
print(f"Total workflows: {len(orchestrator.get_active_workflows())}")
print(f"Agents: {', '.join(a.get_name() for a in orchestrator.list_agents())}")
# Run
if __name__ == '__main__':
asyncio.run(main())

👥 Team assembled:
- Router (Manager)
- Tech Expert (Engineer)
- Billing Specialist (Finance)
========== SEQUENTIAL EXECUTION ==========
📋 Executing sequential plan: seq-001
→ Router: Classify request
→ Tech Expert: Diagnose issue
→ Billing Specialist: Offer refund
Result: [
{ analysis: '[Router] Analyzed: Classify request', score: 45.2 },
{ analysis: '[Tech Expert] Analyzed: Diagnose issue', score: 67.8 },
{ analysis: '[Billing Specialist] Analyzed: Offer refund', score: 82.1 }
]
========== PARALLEL EXECUTION ==========
⚡ Executing parallel plan: par-001
→ Tech Expert: Investigate technical issue
→ Billing Specialist: Check customer account
Result: [
{ analysis: '[Tech Expert] Analyzed: Investigate technical issue', score: 71.5 },
{ analysis: '[Billing Specialist] Analyzed: Check customer account', score: 56.3 }
]
========== CONSENSUS EXECUTION ==========
🤝 Executing consensus plan: con-001
→ Tech Expert: Assess severity
→ Billing Specialist: Evaluate cost
→ Router: Check policy
Consensus: 65.2% agreement
Consensus Result: {
average_score: 65.2,
agreement_level: 0.652,
passed: false,
details: [...]
}
========== HIERARCHICAL EXECUTION ==========
🏛️ Executing hierarchical plan: hier-001
→ Router: Coordinate resolution
→ Tech Expert: Execute fix
→ Billing Specialist: Process refund
Hierarchical Result: {
delegated: [...],
synthesis: {...}
}
========== SUMMARY ==========
Total workflows: 4
Agents: Router, Tech Expert, Billing Specialist


Customer Support Escalation

Tier 1 agent tries to solve → Escalates to Tier 2 → Escalates to Manager for final decision

Code Review Process

Multiple specialist agents review code in parallel → Consensus on approval → Manager signs off

Risk Assessment

Security agent, Compliance agent, and Financial agent analyze risk independently → Vote on decision

Content Moderation

Multiple moderators review content in parallel → Consensus threshold triggers action