Skip to content

Implementing INTEROP Profile

The INTEROP profile extends ORCH with Layer 7 (Protocol) for external system integration.


INTEROP profile = ORCH profile (L1-L6) + Layer 7 (Protocol)

This enables:

  • Model Context Protocol (MCP) support
  • REST API exposure
  • WebSocket connections
  • External authentication
  • Protocol translation

  • ✅ ORCH profile compliance (L1-L6)
  • ✅ TLS certificates configured
  • ✅ Authentication provider (OAuth2/OIDC)
  • ✅ API documentation tools
  • MCP compatibility
  • REST API endpoints
  • Message validation
  • Envelope transformation
  • Authentication (OAuth2/OIDC)
  • Rate limiting
  • TLS 1.3+ support
  • OpenAPI specification

from mcp import MCPServer
from aral import ProtocolLayer
# Create MCP server
mcp_server = MCPServer(
name="aral-agent",
version="1.0.0"
)
# Register tools (capabilities)
@mcp_server.tool("web_search")
async def web_search(query: str, max_results: int = 10):
"""Search the web"""
# Translate to ARAL Envelope
envelope = {
"id": str(uuid.uuid4()),
"trace_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"source": {"layer": "L7", "protocol": "mcp"},
"destination": {"layer": "L3", "capability_id": "web_search"},
"payload": {
"query": query,
"max_results": max_results
}
}
# Forward to orchestrator
result = await orchestrator.process_envelope(envelope)
return result["payload"]
# Start MCP server
mcp_server.run(transport="stdio")

from flask import Flask, request, jsonify
from functools import wraps
import jwt
app = Flask(__name__)
def require_auth(f):
"""Authentication decorator"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
payload = jwt.decode(token, public_key, algorithms=['RS256'])
request.user = payload
return f(*args, **kwargs)
except jwt.InvalidTokenError:
return jsonify({"error": "unauthorized"}), 401
return decorated
def rate_limit(f):
"""Rate limiting decorator"""
@wraps(f)
def decorated(*args, **kwargs):
client_id = request.user.get('sub')
if not rate_limiter.check(client_id):
return jsonify({
"error": "rate_limit_exceeded",
"retry_after_seconds": rate_limiter.retry_after(client_id)
}), 429
return f(*args, **kwargs)
return decorated
@app.route('/api/v1/agents/<agent_id>/invoke', methods=['POST'])
@require_auth
@rate_limit
def invoke_agent(agent_id):
"""Invoke agent capability"""
# Extract parameters
capability = request.json.get('capability')
parameters = request.json.get('parameters', {})
trace_id = request.headers.get('X-Trace-Id', str(uuid.uuid4()))
# Create envelope
envelope = {
"id": str(uuid.uuid4()),
"trace_id": trace_id,
"timestamp": datetime.now().isoformat(),
"version": "1.0",
"source": {
"layer": "L7",
"protocol": "rest",
"client_id": request.user['sub']
},
"destination": {
"layer": "L3",
"agent_id": agent_id,
"capability_id": capability
},
"payload": parameters,
"metadata": {
"auth_method": "oauth2",
"api_version": "v1"
}
}
# Validate envelope
if not validate_envelope(envelope):
return jsonify({"error": "invalid_request"}), 400
# Forward to orchestrator
try:
result = orchestrator.process_envelope(envelope)
return jsonify(result), 200
except Exception as e:
return jsonify({
"error": "internal_error",
"trace_id": trace_id
}), 500
@app.route('/api/v1/agents/<agent_id>/status', methods=['GET'])
@require_auth
def get_agent_status(agent_id):
"""Get agent status"""
agent = orchestrator.get_agent(agent_id)
if not agent:
return jsonify({"error": "not_found"}), 404
return jsonify({
"agent_id": agent.id,
"status": "healthy" if agent.health_score > 0.7 else "degraded",
"health_score": agent.health_score,
"uptime_seconds": agent.uptime,
"capabilities": agent.capabilities,
"version": agent.version
})
@app.route('/api/v1/capabilities', methods=['GET'])
@require_auth
def list_capabilities():
"""List all available capabilities"""
capabilities = orchestrator.get_all_capabilities()
return jsonify({
"capabilities": [
{
"id": cap.id,
"name": cap.name,
"description": cap.description,
"input_schema": cap.input_schema,
"permissions": cap.permissions
}
for cap in capabilities
]
})

from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6750 import BearerTokenValidator
class MyBearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
# Validate JWT token
try:
payload = jwt.decode(
token_string,
public_key,
algorithms=['RS256'],
audience='aral-api'
)
return payload
except jwt.InvalidTokenError:
return None
require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyBearerTokenValidator())

from redis import Redis
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, redis_client, requests_per_minute=60):
self.redis = redis_client
self.limit = requests_per_minute
def check(self, client_id):
"""Check if request is allowed"""
key = f"rate_limit:{client_id}"
current = self.redis.get(key)
if current and int(current) >= self.limit:
return False
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, 60)
pipe.execute()
return True
def retry_after(self, client_id):
"""Get retry-after seconds"""
key = f"rate_limit:{client_id}"
ttl = self.redis.ttl(key)
return max(ttl, 0)

from flask_socketio import SocketIO, emit, join_room
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('connect')
def handle_connect():
"""Handle WebSocket connection"""
token = request.args.get('token')
try:
user = jwt.decode(token, public_key, algorithms=['RS256'])
emit('connected', {'status': 'authenticated'})
except jwt.InvalidTokenError:
return False
@socketio.on('subscribe')
def handle_subscribe(data):
"""Subscribe to agent events"""
agent_id = data.get('agent_id')
join_room(f"agent:{agent_id}")
emit('subscribed', {'agent_id': agent_id})
@socketio.on('invoke')
def handle_invoke(data):
"""Invoke capability via WebSocket"""
envelope = create_envelope(data, protocol="websocket")
result = orchestrator.process_envelope(envelope)
emit('result', result)

from flask_swagger_ui import get_swaggerui_blueprint
# Generate OpenAPI spec
openapi_spec = {
"openapi": "3.1.0",
"info": {
"title": "ARAL Agent API",
"version": "1.0.0",
"description": "API for interacting with ARAL-compliant agents"
},
"servers": [
{"url": "https://api.example.com/api/v1"}
],
"paths": {
"/agents/{agent_id}/invoke": {
"post": {
"summary": "Invoke agent capability",
"parameters": [
{
"name": "agent_id",
"in": "path",
"required": True,
"schema": {"type": "string"}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"capability": {"type": "string"},
"parameters": {"type": "object"}
}
}
}
}
},
"responses": {
"200": {"description": "Success"},
"401": {"description": "Unauthorized"},
"429": {"description": "Rate limit exceeded"}
}
}
}
}
}
# Serve Swagger UI
SWAGGER_URL = '/docs'
API_URL = '/api/v1/openapi.json'
swaggerui_blueprint = get_swaggerui_blueprint(SWAGGER_URL, API_URL)
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)

import ssl
# Create SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('server.crt', 'server.key')
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
# Run with TLS
app.run(
host='0.0.0.0',
port=443,
ssl_context=ssl_context
)

Terminal window
# Test MCP
mcp-test --server stdio --command "python agent.py"
# Test REST API
curl -X POST https://localhost:443/api/v1/agents/agent-1/invoke \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"capability": "web_search", "parameters": {"query": "ARAL"}}'
# Test WebSocket
wscat -c wss://localhost:443/ws?token=$TOKEN
# Run full INTEROP tests
aral-test --api-url https://localhost:443 --profile INTEROP

  1. Always Use TLS: Encrypt all external communication
  2. Implement Rate Limiting: Protect against abuse
  3. Validate All Inputs: Never trust external data
  4. Log External Requests: Maintain audit trail
  5. Document Your API: Generate OpenAPI specs
  6. Use Standard Protocols: Support MCP, REST, WebSocket
  7. Handle Errors Gracefully: Return helpful error messages
  8. Monitor API Usage: Track latency, errors, rate limits

Monitor & Debug

Set up observability and tracing


© 2026 IbIFACE — CC BY 4.0