INTEROP Profile
Add protocol layer for external communication
The ORCH profile extends CORE with Layer 6 (Orchestration) for multi-agent coordination.
ORCH profile = CORE profile (L1-L5) + Layer 6 (Orchestration)
This enables:
from aral import Orchestrator, CircuitBreaker, LoadBalancer
orchestrator = Orchestrator( orchestrator_id="orch-1", port=8081)
# Configure circuit breakercircuit_breaker = CircuitBreaker( failure_threshold=5, success_threshold=3, timeout_ms=60000, half_open_requests=1)orchestrator.set_circuit_breaker(circuit_breaker)
# Configure load balancerload_balancer = LoadBalancer( strategy="health_based", minimum_health_score=0.7)orchestrator.set_load_balancer(load_balancer)
# Start orchestratororchestrator.start()# Agent registers with orchestratoragent_info = { "id": "agent-1", "endpoint": "http://agent1:8080", "health_endpoint": "http://agent1:8080/health", "capabilities": ["web_search", "calculate"], "version": "1.0.0"}
orchestrator.register_agent(agent_info)@orchestrator.route("/invoke")def route_request(request): # Extract requirements required_capability = request.json.get("capability") trace_id = request.headers.get("X-Trace-Id")
# Find suitable agent agent = orchestrator.select_agent( capability=required_capability, trace_id=trace_id )
if not agent: return { "error": "no_agent_available", "message": "No agent with required capability found" }, 503
# Check persona constraints if not orchestrator.validate_persona(agent, request.json): return { "error": "persona_violation", "message": "Request violates agent persona constraints" }, 403
# Forward request return orchestrator.forward_request(agent, request, trace_id)class CircuitBreaker: def __init__(self, failure_threshold, success_threshold, timeout_ms): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout_ms = timeout_ms self.state = "closed" self.failures = {} self.successes = {}
def call(self, agent_id, func): """Execute function with circuit breaker""" if self.state == "open": if self._should_try_half_open(agent_id): self.state = "half_open" else: raise CircuitBreakerOpen(f"Circuit open for {agent_id}")
try: result = func() self._record_success(agent_id) return result except Exception as e: self._record_failure(agent_id) raise
def _record_failure(self, agent_id): """Record failure and potentially open circuit""" self.failures[agent_id] = self.failures.get(agent_id, 0) + 1
if self.failures[agent_id] >= self.failure_threshold: self.state = "open" self.opened_at = time.time()
def _record_success(self, agent_id): """Record success and potentially close circuit""" self.successes[agent_id] = self.successes.get(agent_id, 0) + 1
if self.state == "half_open": if self.successes[agent_id] >= self.success_threshold: self.state = "closed" self.failures[agent_id] = 0class LoadBalancer: def __init__(self, strategy="health_based"): self.strategy = strategy
def select_agent(self, agents, capability=None): """Select best agent based on strategy""" # Filter by capability if capability: agents = [a for a in agents if capability in a.capabilities]
if not agents: return None
if self.strategy == "round_robin": return self._round_robin(agents) elif self.strategy == "least_connections": return self._least_connections(agents) elif self.strategy == "health_based": return self._health_based(agents) else: return agents[0]
def _health_based(self, agents): """Select agent with best health score""" healthy_agents = [a for a in agents if a.health_score > 0.7] if not healthy_agents: return None return max(healthy_agents, key=lambda a: a.health_score)def validate_persona(orchestrator, agent, request): """Validate request against agent persona""" capability = request["capability"] persona = agent.persona
# Check allowed capabilities if capability in persona.constraints["denied_capabilities"]: orchestrator.log_violation(agent.id, capability, "denied") return False
allowed = persona.constraints["allowed_capabilities"] if "*" not in allowed and capability not in allowed: orchestrator.log_violation(agent.id, capability, "not_allowed") return False
return Truedef forward_request(orchestrator, agent, request, trace_id): """Forward request with trace context""" headers = { "X-Trace-Id": trace_id, "X-Parent-Span-Id": request.span_id, "X-Span-Id": str(uuid.uuid4()) }
response = requests.post( f"{agent.endpoint}/invoke", json=request.json, headers=headers, timeout=30 )
return response.json()import asyncio
async def monitor_agent_health(orchestrator): """Continuously monitor agent health""" while True: for agent in orchestrator.agents: try: response = requests.get( agent.health_endpoint, timeout=5 )
if response.status_code == 200: health_data = response.json() agent.health_score = 1.0 agent.last_heartbeat = time.time() else: agent.health_score *= 0.8 except Exception: agent.health_score *= 0.5
await asyncio.sleep(10)# Start multiple agentspython agent.py --port 8080 &python agent.py --port 8081 &python agent.py --port 8082 &
# Start orchestratorpython orchestrator.py --port 8090 &
# Run ORCH testsaral-test --orchestrator-url http://localhost:8090 --profile ORCH# Client sends request to orchestratorresponse = requests.post( "http://localhost:8090/invoke", json={ "capability": "web_search", "parameters": { "query": "ARAL specification" } }, headers={ "X-Trace-Id": "trace-123" })
# Orchestrator:# 1. Finds agents with "web_search" capability# 2. Selects healthiest agent# 3. Checks persona constraints# 4. Forwards request with trace context# 5. Returns response to clientdef handle_agent_failure(orchestrator, primary_agent, request): """Handle failure with fallback""" try: return orchestrator.call_agent(primary_agent, request) except AgentUnavailable: # Try fallback fallback = orchestrator.find_fallback(primary_agent) if fallback: return orchestrator.call_agent(fallback, request, degraded=True) else: return { "error": "all_agents_unavailable", "degraded_mode": True }INTEROP Profile
Add protocol layer for external communication
Multi-Agent Example
Specification
Read L6 requirements
© 2026 IbIFACE — CC BY 4.0