Custom Capabilities
Information
Layer 3 Focus: Advanced capability patterns and custom action development
This example shows how to build custom capabilities beyond the built-ins, including external API integrations, database operations, and specialized domain actions.
Custom Capability Types
Section titled “Custom Capability Types”Complete Implementation
Section titled “Complete Implementation”// ==========================================// CAPABILITY INTERFACE// ==========================================interface CapabilityAction { name: string; description: string; category: 'api' | 'database' | 'communication' | 'file' | 'compute'; parameters: { type: 'object'; properties: Record<string, any>; required: string[]; }; handler: (params: any) => Promise<any>; timeout?: number; retryable?: boolean;}
class CustomCapabilityRegistry { private actions: Map<string, CapabilityAction> = new Map(); private executionLog: Array<{ action: string; timestamp: number; duration: number; success: boolean }> = [];
register(action: CapabilityAction): void { if (this.actions.has(action.name)) { throw new Error(`Action ${action.name} already registered`); } this.actions.set(action.name, action); console.log(`✅ Registered: ${action.name}`); }
async execute(name: string, params: any): Promise<any> { const action = this.actions.get(name); if (!action) { throw new Error(`Action ${name} not found`); }
// Validate parameters this.validateParameters(action, params);
const startTime = Date.now(); try { const result = await Promise.race([ action.handler(params), this.timeout(action.timeout || 30000) ]);
const duration = Date.now() - startTime; this.logExecution(name, duration, true);
return result; } catch (error) { const duration = Date.now() - startTime; this.logExecution(name, duration, false);
if (action.retryable) { console.log(`⚠️ Retrying ${name}...`); return await this.execute(name, params); // Simple retry }
throw error; } }
private validateParameters(action: CapabilityAction, params: any): void { for (const required of action.parameters.required) { if (!(required in params)) { throw new Error(`Missing required parameter: ${required}`); } }
for (const [key, value] of Object.entries(params)) { const paramSchema = action.parameters.properties[key]; if (paramSchema && paramSchema.type) { const valueType = typeof value; if (valueType !== paramSchema.type) { throw new Error(`Parameter ${key} must be ${paramSchema.type}, got ${valueType}`); } } } }
private logExecution(name: string, duration: number, success: boolean): void { this.executionLog.push({ action: name, timestamp: Date.now(), duration, success }); }
private timeout(ms: number): Promise<never> { return new Promise((_, reject) => setTimeout(() => reject(new Error('Action timeout')), ms) ); }
getExecutionStats(name?: string): any { const logs = name ? this.executionLog.filter(l => l.action === name) : this.executionLog;
return { totalExecutions: logs.length, successful: logs.filter(l => l.success).length, failed: logs.filter(l => !l.success).length, averageDuration: logs.reduce((sum, l) => sum + l.duration, 0) / (logs.length || 1), lastExecution: logs[logs.length - 1]?.timestamp }; }}
// ==========================================// API CAPABILITIES// ==========================================class APICapabilities { static createRESTAction(name: string, method: string, baseUrl: string): CapabilityAction { return { name, description: `Execute ${method} request to ${baseUrl}`, category: 'api', parameters: { type: 'object', properties: { endpoint: { type: 'string' }, query: { type: 'object' }, body: { type: 'object' } }, required: ['endpoint'] }, handler: async (params) => { const url = new URL(params.endpoint, baseUrl); if (params.query) { Object.entries(params.query).forEach(([key, value]) => { url.searchParams.append(key, String(value)); }); }
const response = await fetch(url.toString(), { method, headers: { 'Content-Type': 'application/json' }, body: params.body ? JSON.stringify(params.body) : undefined });
if (!response.ok) { throw new Error(`API error: ${response.status}`); }
return response.json(); }, timeout: 10000, retryable: true }; }
static createWeatherAction(): CapabilityAction { return { name: 'get_weather', description: 'Get weather for a location', category: 'api', parameters: { type: 'object', properties: { city: { type: 'string' }, units: { type: 'string', default: 'metric' } }, required: ['city'] }, handler: async (params) => { // Mock weather API return { city: params.city, temperature: 22, condition: 'Sunny', humidity: 65, windSpeed: 12 }; } }; }
static createWebSearchAction(): CapabilityAction { return { name: 'web_search', description: 'Search the web for information', category: 'api', parameters: { type: 'object', properties: { query: { type: 'string' }, maxResults: { type: 'number', default: 5 } }, required: ['query'] }, handler: async (params) => { // Mock search results return { query: params.query, results: [ { title: 'Result 1', url: 'https://example.com/1', snippet: 'First result...' }, { title: 'Result 2', url: 'https://example.com/2', snippet: 'Second result...' } ] }; } }; }}
// ==========================================// DATABASE CAPABILITIES// ==========================================class DatabaseCapabilities { private mockDb: Map<string, any[]> = new Map();
createQueryAction(): CapabilityAction { return { name: 'query_database', description: 'Execute a database query', category: 'database', parameters: { type: 'object', properties: { table: { type: 'string' }, filter: { type: 'object' } }, required: ['table'] }, handler: async (params) => { const table = params.table; const data = this.mockDb.get(table) || [];
if (params.filter) { return data.filter(item => Object.entries(params.filter).every( ([key, value]) => item[key] === value ) ); }
return data; } }; }
createInsertAction(): CapabilityAction { return { name: 'insert_record', description: 'Insert a record into database', category: 'database', parameters: { type: 'object', properties: { table: { type: 'string' }, data: { type: 'object' } }, required: ['table', 'data'] }, handler: async (params) => { const table = params.table; if (!this.mockDb.has(table)) { this.mockDb.set(table, []); }
const record = { id: Date.now(), ...params.data, createdAt: new Date().toISOString() };
this.mockDb.get(table)!.push(record); return { id: record.id, success: true }; } }; }
createUpdateAction(): CapabilityAction { return { name: 'update_record', description: 'Update a record in database', category: 'database', parameters: { type: 'object', properties: { table: { type: 'string' }, id: { type: 'number' }, updates: { type: 'object' } }, required: ['table', 'id', 'updates'] }, handler: async (params) => { const table = params.table; const data = this.mockDb.get(table) || []; const record = data.find(r => r.id === params.id);
if (!record) { throw new Error(`Record not found: ${params.id}`); }
Object.assign(record, params.updates, { updatedAt: new Date().toISOString() });
return { id: record.id, success: true }; } }; }}
// ==========================================// COMMUNICATION CAPABILITIES// ==========================================class CommunicationCapabilities { static createEmailAction(): CapabilityAction { return { name: 'send_email', description: 'Send an email', category: 'communication', parameters: { type: 'object', properties: { to: { type: 'string' }, subject: { type: 'string' }, body: { type: 'string' }, html: { type: 'string' } }, required: ['to', 'subject', 'body'] }, handler: async (params) => { console.log(`📧 Email sent to ${params.to}: ${params.subject}`); return { messageId: `msg-${Date.now()}`, sent: true, recipient: params.to }; }, timeout: 5000, retryable: true }; }
static createSlackAction(webhookUrl: string): CapabilityAction { return { name: 'send_slack_message', description: 'Send a message to Slack', category: 'communication', parameters: { type: 'object', properties: { channel: { type: 'string' }, message: { type: 'string' }, blocks: { type: 'object' } }, required: ['channel', 'message'] }, handler: async (params) => { console.log(`💬 Slack: ${params.channel} - ${params.message}`); return { ts: Date.now() / 1000, sent: true, channel: params.channel }; }, retryable: true }; }
static createSMSAction(): CapabilityAction { return { name: 'send_sms', description: 'Send an SMS message', category: 'communication', parameters: { type: 'object', properties: { phone: { type: 'string' }, message: { type: 'string' } }, required: ['phone', 'message'] }, handler: async (params) => { console.log(`📱 SMS to ${params.phone}: ${params.message}`); return { sid: `SM${Date.now()}`, sent: true, phone: params.phone }; } }; }}
// ==========================================// FILE CAPABILITIES// ==========================================class FileCapabilities { static createReadFileAction(): CapabilityAction { return { name: 'read_file', description: 'Read a file from disk', category: 'file', parameters: { type: 'object', properties: { path: { type: 'string' }, encoding: { type: 'string', default: 'utf-8' } }, required: ['path'] }, handler: async (params) => { // Mock file read return { path: params.path, content: 'File content here...', size: 1024, encoding: params.encoding }; } }; }
static createWriteFileAction(): CapabilityAction { return { name: 'write_file', description: 'Write content to a file', category: 'file', parameters: { type: 'object', properties: { path: { type: 'string' }, content: { type: 'string' }, append: { type: 'boolean', default: false } }, required: ['path', 'content'] }, handler: async (params) => { console.log(`📝 Writing to ${params.path}`); return { path: params.path, bytesWritten: params.content.length, success: true }; } }; }
static createListFilesAction(): CapabilityAction { return { name: 'list_files', description: 'List files in a directory', category: 'file', parameters: { type: 'object', properties: { path: { type: 'string' }, recursive: { type: 'boolean', default: false } }, required: ['path'] }, handler: async (params) => { // Mock directory listing return { path: params.path, files: ['file1.txt', 'file2.txt', 'file3.txt'], count: 3 }; } }; }}
// ==========================================// COMPUTE CAPABILITIES// ==========================================class ComputeCapabilities { static createAnalyticsAction(): CapabilityAction { return { name: 'analyze_data', description: 'Analyze data and generate insights', category: 'compute', parameters: { type: 'object', properties: { data: { type: 'object' }, metrics: { type: 'object' } }, required: ['data'] }, handler: async (params) => { return { mean: 42.5, median: 40, stdDev: 8.3, min: 10, max: 95, insights: 'Data is normally distributed' }; } }; }
static createMLPredictionAction(): CapabilityAction { return { name: 'predict', description: 'Make ML-based prediction', category: 'compute', parameters: { type: 'object', properties: { features: { type: 'object' }, model: { type: 'string' } }, required: ['features', 'model'] }, handler: async (params) => { return { prediction: Math.random() > 0.5 ? 'positive' : 'negative', confidence: 0.87, model: params.model }; } }; }}
// ==========================================// USAGE EXAMPLE// ==========================================async function main() { const registry = new CustomCapabilityRegistry();
// Register API capabilities registry.register(APICapabilities.createWeatherAction()); registry.register(APICapabilities.createWebSearchAction());
// Register database capabilities const dbCaps = new DatabaseCapabilities(); registry.register(dbCaps.createQueryAction()); registry.register(dbCaps.createInsertAction()); registry.register(dbCaps.createUpdateAction());
// Register communication capabilities registry.register(CommunicationCapabilities.createEmailAction()); registry.register(CommunicationCapabilities.createSlackAction('https://hooks.slack.com/...')); registry.register(CommunicationCapabilities.createSMSAction());
// Register file capabilities registry.register(FileCapabilities.createReadFileAction()); registry.register(FileCapabilities.createWriteFileAction()); registry.register(FileCapabilities.createListFilesAction());
// Register compute capabilities registry.register(ComputeCapabilities.createAnalyticsAction()); registry.register(ComputeCapabilities.createMLPredictionAction());
// Test executions console.log('\n=== EXECUTING CAPABILITIES ===\n');
try { // API const weather = await registry.execute('get_weather', { city: 'Paris' }); console.log('Weather:', weather);
// Database const inserted = await registry.execute('insert_record', { table: 'users', data: { name: 'Alice', email: 'alice@example.com' } }); console.log('Inserted:', inserted);
// Communication const email = await registry.execute('send_email', { to: 'alice@example.com', subject: 'Hello', body: 'Test email' }); console.log('Email:', email);
// File const listed = await registry.execute('list_files', { path: '/documents' }); console.log('Files:', listed);
// Compute const analysis = await registry.execute('analyze_data', { data: { values: [1, 2, 3, 4, 5] } }); console.log('Analysis:', analysis);
// Stats console.log('\n=== EXECUTION STATS ==='); console.log('Overall:', registry.getExecutionStats()); console.log('get_weather:', registry.getExecutionStats('get_weather')); } catch (error) { console.error('Error:', error); }}
main().catch(console.error);from typing import Dict, Any, List, Callablefrom datetime import datetimeimport asyncio
# ==========================================# CAPABILITY INTERFACE# ==========================================class CapabilityAction: def __init__(self, name: str, description: str, category: str, parameters: dict, handler: Callable, timeout: int = 30000, retryable: bool = False): self.name = name self.description = description self.category = category self.parameters = parameters self.handler = handler self.timeout = timeout self.retryable = retryable
class CustomCapabilityRegistry: def __init__(self): self.actions: Dict[str, CapabilityAction] = {} self.execution_log = []
def register(self, action: CapabilityAction): if action.name in self.actions: raise ValueError(f"Action {action.name} already registered") self.actions[action.name] = action print(f"✅ Registered: {action.name}")
async def execute(self, name: str, params: dict) -> Any: if name not in self.actions: raise ValueError(f"Action {name} not found")
action = self.actions[name]
# Validate parameters self._validate_parameters(action, params)
start_time = datetime.now() try: # Execute with timeout result = await asyncio.wait_for( action.handler(params), timeout=action.timeout / 1000 )
duration = (datetime.now() - start_time).total_seconds() * 1000 self._log_execution(name, duration, True)
return result except asyncio.TimeoutError: duration = (datetime.now() - start_time).total_seconds() * 1000 self._log_execution(name, duration, False) raise Exception("Action timeout") except Exception as error: duration = (datetime.now() - start_time).total_seconds() * 1000 self._log_execution(name, duration, False)
if action.retryable: print(f"⚠️ Retrying {name}...") return await self.execute(name, params)
raise
def _validate_parameters(self, action: CapabilityAction, params: dict): for required in action.parameters.get('required', []): if required not in params: raise ValueError(f"Missing required parameter: {required}")
def _log_execution(self, name: str, duration: float, success: bool): self.execution_log.append({ 'action': name, 'timestamp': datetime.now().isoformat(), 'duration': duration, 'success': success })
def get_execution_stats(self, name: str = None) -> dict: logs = [l for l in self.execution_log if l['action'] == name] if name else self.execution_log
if not logs: return {'total_executions': 0}
return { 'total_executions': len(logs), 'successful': sum(1 for l in logs if l['success']), 'failed': sum(1 for l in logs if not l['success']), 'average_duration': sum(l['duration'] for l in logs) / len(logs), 'last_execution': logs[-1]['timestamp'] }
# ==========================================# API CAPABILITIES# ==========================================class APICapabilities: @staticmethod def create_weather_action() -> CapabilityAction: async def handler(params: dict): return { 'city': params['city'], 'temperature': 22, 'condition': 'Sunny', 'humidity': 65, 'wind_speed': 12 }
return CapabilityAction( name='get_weather', description='Get weather for a location', category='api', parameters={ 'type': 'object', 'properties': { 'city': {'type': 'string'}, 'units': {'type': 'string', 'default': 'metric'} }, 'required': ['city'] }, handler=handler )
@staticmethod def create_web_search_action() -> CapabilityAction: async def handler(params: dict): return { 'query': params['query'], 'results': [ {'title': 'Result 1', 'url': 'https://example.com/1'}, {'title': 'Result 2', 'url': 'https://example.com/2'} ] }
return CapabilityAction( name='web_search', description='Search the web', category='api', parameters={ 'type': 'object', 'properties': { 'query': {'type': 'string'}, 'max_results': {'type': 'number', 'default': 5} }, 'required': ['query'] }, handler=handler )
# ==========================================# DATABASE CAPABILITIES# ==========================================class DatabaseCapabilities: def __init__(self): self.mock_db: Dict[str, List[dict]] = {}
def create_query_action(self) -> CapabilityAction: async def handler(params: dict): table = params['table'] data = self.mock_db.get(table, [])
if 'filter' in params: return [item for item in data if all( item.get(k) == v for k, v in params['filter'].items() )]
return data
return CapabilityAction( name='query_database', description='Execute a database query', category='database', parameters={ 'type': 'object', 'properties': { 'table': {'type': 'string'}, 'filter': {'type': 'object'} }, 'required': ['table'] }, handler=handler )
def create_insert_action(self) -> CapabilityAction: async def handler(params: dict): table = params['table'] if table not in self.mock_db: self.mock_db[table] = []
record = { 'id': int(datetime.now().timestamp() * 1000), **params['data'], 'created_at': datetime.now().isoformat() }
self.mock_db[table].append(record) return {'id': record['id'], 'success': True}
return CapabilityAction( name='insert_record', description='Insert a record', category='database', parameters={ 'type': 'object', 'properties': { 'table': {'type': 'string'}, 'data': {'type': 'object'} }, 'required': ['table', 'data'] }, handler=handler )
# ==========================================# COMMUNICATION CAPABILITIES# ==========================================class CommunicationCapabilities: @staticmethod def create_email_action() -> CapabilityAction: async def handler(params: dict): print(f"📧 Email to {params['to']}: {params['subject']}") return { 'message_id': f"msg-{int(datetime.now().timestamp() * 1000)}", 'sent': True, 'recipient': params['to'] }
return CapabilityAction( name='send_email', description='Send an email', category='communication', parameters={ 'type': 'object', 'properties': { 'to': {'type': 'string'}, 'subject': {'type': 'string'}, 'body': {'type': 'string'} }, 'required': ['to', 'subject', 'body'] }, handler=handler, retryable=True )
@staticmethod def create_slack_action() -> CapabilityAction: async def handler(params: dict): print(f"💬 Slack: {params['channel']} - {params['message']}") return { 'ts': datetime.now().timestamp(), 'sent': True, 'channel': params['channel'] }
return CapabilityAction( name='send_slack_message', description='Send Slack message', category='communication', parameters={ 'type': 'object', 'properties': { 'channel': {'type': 'string'}, 'message': {'type': 'string'} }, 'required': ['channel', 'message'] }, handler=handler, retryable=True )
# ==========================================# COMPUTE CAPABILITIES# ==========================================class ComputeCapabilities: @staticmethod def create_analytics_action() -> CapabilityAction: async def handler(params: dict): return { 'mean': 42.5, 'median': 40, 'std_dev': 8.3, 'min': 10, 'max': 95, 'insights': 'Data is normally distributed' }
return CapabilityAction( name='analyze_data', description='Analyze data', category='compute', parameters={ 'type': 'object', 'properties': { 'data': {'type': 'object'} }, 'required': ['data'] }, handler=handler )
@staticmethod def create_ml_prediction_action() -> CapabilityAction: async def handler(params: dict): import random return { 'prediction': 'positive' if random.random() > 0.5 else 'negative', 'confidence': 0.87, 'model': params['model'] }
return CapabilityAction( name='predict', description='Make ML prediction', category='compute', parameters={ 'type': 'object', 'properties': { 'features': {'type': 'object'}, 'model': {'type': 'string'} }, 'required': ['features', 'model'] }, handler=handler )
# ==========================================# USAGE EXAMPLE# ==========================================async def main(): registry = CustomCapabilityRegistry()
# Register capabilities registry.register(APICapabilities.create_weather_action()) registry.register(APICapabilities.create_web_search_action())
db_caps = DatabaseCapabilities() registry.register(db_caps.create_query_action()) registry.register(db_caps.create_insert_action())
registry.register(CommunicationCapabilities.create_email_action()) registry.register(CommunicationCapabilities.create_slack_action())
registry.register(ComputeCapabilities.create_analytics_action()) registry.register(ComputeCapabilities.create_ml_prediction_action())
# Test executions print('\n=== EXECUTING CAPABILITIES ===\n')
try: weather = await registry.execute('get_weather', {'city': 'Paris'}) print('Weather:', weather)
inserted = await registry.execute('insert_record', { 'table': 'users', 'data': {'name': 'Alice', 'email': 'alice@example.com'} }) print('Inserted:', inserted)
email = await registry.execute('send_email', { 'to': 'alice@example.com', 'subject': 'Hello', 'body': 'Test email' }) print('Email:', email)
analysis = await registry.execute('analyze_data', { 'data': {'values': [1, 2, 3, 4, 5]} }) print('Analysis:', analysis)
print('\n=== EXECUTION STATS ===') print('Overall:', registry.get_execution_stats()) print('get_weather:', registry.get_execution_stats('get_weather'))
except Exception as error: print(f'Error: {error}')
if __name__ == '__main__': asyncio.run(main())Expected Output
Section titled “Expected Output”✅ Registered: get_weather✅ Registered: web_search✅ Registered: query_database✅ Registered: insert_record✅ Registered: send_email✅ Registered: send_slack_message✅ Registered: analyze_data✅ Registered: predict
=== EXECUTING CAPABILITIES ===
Weather: { city: 'Paris', temperature: 22, condition: 'Sunny', humidity: 65, wind_speed: 12}
📧 Email to alice@example.com: HelloEmail: { message_id: 'msg-1736952615000', sent: true, recipient: 'alice@example.com'}
Inserted: { id: 1736952615000, success: true}
💬 Slack: general - Alert: Task completedSlack: { ts: 1736952615.000, sent: true, channel: 'general'}
Analysis: { mean: 42.5, median: 40, std_dev: 8.3, min: 10, max: 95, insights: 'Data is normally distributed'}
=== EXECUTION STATS ===Overall: { total_executions: 5, successful: 5, failed: 0, average_duration: 12.4, last_execution: '2025-01-14T14:30:15Z'}get_weather: { total_executions: 1, successful: 1, failed: 0, average_duration: 5.2, last_execution: '2025-01-14T14:30:15Z'}Capability Categories
Section titled “Capability Categories”network 🌐 API Integrations
REST, GraphQL, webhooks, external services
database 💾 Database Operations
CRUD, queries, transactions, analytics
envelope 📧 Communication
Email, SMS, Slack, webhooks
folder 📁 File Operations
Read, write, upload, process files
cog ⚙️ Computation
Analytics, ML predictions, calculations
lock 🔐 Security
Encryption, verification, authentication
Best Practices
Section titled “Best Practices”Parameter Validation
Always validate required parameters and types before execution.
validateParameters(action, params); // Throws on invalid inputTimeout Handling
Set appropriate timeouts for each capability type.
const result = await Promise.race([ action.handler(params), timeout(action.timeout || 30000)]);Retry Logic
Mark idempotent operations as retryable.
if (action.retryable) { return await this.execute(name, params);}Execution Logging
Track all executions for monitoring and debugging.
logExecution(name, duration, success);getExecutionStats(name); // Get metrics