Deploy to Production
Review security best practices
The INTEROP profile extends ORCH with Layer 7 (Protocol) for external system integration.
INTEROP profile = ORCH profile (L1-L6) + Layer 7 (Protocol)
This enables:
from mcp import MCPServerfrom aral import ProtocolLayer
# Create MCP servermcp_server = MCPServer( name="aral-agent", version="1.0.0")
# Register tools (capabilities)@mcp_server.tool("web_search")async def web_search(query: str, max_results: int = 10): """Search the web""" # Translate to ARAL Envelope envelope = { "id": str(uuid.uuid4()), "trace_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "source": {"layer": "L7", "protocol": "mcp"}, "destination": {"layer": "L3", "capability_id": "web_search"}, "payload": { "query": query, "max_results": max_results } }
# Forward to orchestrator result = await orchestrator.process_envelope(envelope)
return result["payload"]
# Start MCP servermcp_server.run(transport="stdio")from flask import Flask, request, jsonifyfrom functools import wrapsimport jwt
app = Flask(__name__)
def require_auth(f): """Authentication decorator""" @wraps(f) def decorated(*args, **kwargs): token = request.headers.get('Authorization', '').replace('Bearer ', '') try: payload = jwt.decode(token, public_key, algorithms=['RS256']) request.user = payload return f(*args, **kwargs) except jwt.InvalidTokenError: return jsonify({"error": "unauthorized"}), 401 return decorated
def rate_limit(f): """Rate limiting decorator""" @wraps(f) def decorated(*args, **kwargs): client_id = request.user.get('sub') if not rate_limiter.check(client_id): return jsonify({ "error": "rate_limit_exceeded", "retry_after_seconds": rate_limiter.retry_after(client_id) }), 429 return f(*args, **kwargs) return decorated@app.route('/api/v1/agents/<agent_id>/invoke', methods=['POST'])@require_auth@rate_limitdef invoke_agent(agent_id): """Invoke agent capability""" # Extract parameters capability = request.json.get('capability') parameters = request.json.get('parameters', {}) trace_id = request.headers.get('X-Trace-Id', str(uuid.uuid4()))
# Create envelope envelope = { "id": str(uuid.uuid4()), "trace_id": trace_id, "timestamp": datetime.now().isoformat(), "version": "1.0", "source": { "layer": "L7", "protocol": "rest", "client_id": request.user['sub'] }, "destination": { "layer": "L3", "agent_id": agent_id, "capability_id": capability }, "payload": parameters, "metadata": { "auth_method": "oauth2", "api_version": "v1" } }
# Validate envelope if not validate_envelope(envelope): return jsonify({"error": "invalid_request"}), 400
# Forward to orchestrator try: result = orchestrator.process_envelope(envelope) return jsonify(result), 200 except Exception as e: return jsonify({ "error": "internal_error", "trace_id": trace_id }), 500
@app.route('/api/v1/agents/<agent_id>/status', methods=['GET'])@require_authdef get_agent_status(agent_id): """Get agent status""" agent = orchestrator.get_agent(agent_id) if not agent: return jsonify({"error": "not_found"}), 404
return jsonify({ "agent_id": agent.id, "status": "healthy" if agent.health_score > 0.7 else "degraded", "health_score": agent.health_score, "uptime_seconds": agent.uptime, "capabilities": agent.capabilities, "version": agent.version })
@app.route('/api/v1/capabilities', methods=['GET'])@require_authdef list_capabilities(): """List all available capabilities""" capabilities = orchestrator.get_all_capabilities() return jsonify({ "capabilities": [ { "id": cap.id, "name": cap.name, "description": cap.description, "input_schema": cap.input_schema, "permissions": cap.permissions } for cap in capabilities ] })from authlib.integrations.flask_oauth2 import ResourceProtectorfrom authlib.oauth2.rfc6750 import BearerTokenValidator
class MyBearerTokenValidator(BearerTokenValidator): def authenticate_token(self, token_string): # Validate JWT token try: payload = jwt.decode( token_string, public_key, algorithms=['RS256'], audience='aral-api' ) return payload except jwt.InvalidTokenError: return None
require_oauth = ResourceProtector()require_oauth.register_token_validator(MyBearerTokenValidator())from redis import Redisfrom datetime import datetime, timedelta
class RateLimiter: def __init__(self, redis_client, requests_per_minute=60): self.redis = redis_client self.limit = requests_per_minute
def check(self, client_id): """Check if request is allowed""" key = f"rate_limit:{client_id}" current = self.redis.get(key)
if current and int(current) >= self.limit: return False
pipe = self.redis.pipeline() pipe.incr(key) pipe.expire(key, 60) pipe.execute()
return True
def retry_after(self, client_id): """Get retry-after seconds""" key = f"rate_limit:{client_id}" ttl = self.redis.ttl(key) return max(ttl, 0)from flask_socketio import SocketIO, emit, join_room
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('connect')def handle_connect(): """Handle WebSocket connection""" token = request.args.get('token') try: user = jwt.decode(token, public_key, algorithms=['RS256']) emit('connected', {'status': 'authenticated'}) except jwt.InvalidTokenError: return False
@socketio.on('subscribe')def handle_subscribe(data): """Subscribe to agent events""" agent_id = data.get('agent_id') join_room(f"agent:{agent_id}") emit('subscribed', {'agent_id': agent_id})
@socketio.on('invoke')def handle_invoke(data): """Invoke capability via WebSocket""" envelope = create_envelope(data, protocol="websocket") result = orchestrator.process_envelope(envelope) emit('result', result)from flask_swagger_ui import get_swaggerui_blueprint
# Generate OpenAPI specopenapi_spec = { "openapi": "3.1.0", "info": { "title": "ARAL Agent API", "version": "1.0.0", "description": "API for interacting with ARAL-compliant agents" }, "servers": [ {"url": "https://api.example.com/api/v1"} ], "paths": { "/agents/{agent_id}/invoke": { "post": { "summary": "Invoke agent capability", "parameters": [ { "name": "agent_id", "in": "path", "required": True, "schema": {"type": "string"} } ], "requestBody": { "content": { "application/json": { "schema": { "type": "object", "properties": { "capability": {"type": "string"}, "parameters": {"type": "object"} } } } } }, "responses": { "200": {"description": "Success"}, "401": {"description": "Unauthorized"}, "429": {"description": "Rate limit exceeded"} } } } }}
# Serve Swagger UISWAGGER_URL = '/docs'API_URL = '/api/v1/openapi.json'swaggerui_blueprint = get_swaggerui_blueprint(SWAGGER_URL, API_URL)app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)import ssl
# Create SSL contextssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)ssl_context.load_cert_chain('server.crt', 'server.key')ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
# Run with TLSapp.run( host='0.0.0.0', port=443, ssl_context=ssl_context)# Test MCPmcp-test --server stdio --command "python agent.py"
# Test REST APIcurl -X POST https://localhost:443/api/v1/agents/agent-1/invoke \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"capability": "web_search", "parameters": {"query": "ARAL"}}'
# Test WebSocketwscat -c wss://localhost:443/ws?token=$TOKEN
# Run full INTEROP testsaral-test --api-url https://localhost:443 --profile INTEROPDeploy to Production
Review security best practices
Monitor & Debug
Set up observability and tracing
Scale Horizontally
Deploy multiple orchestrators
© 2026 IbIFACE — CC BY 4.0