Skip to content

Custom Capabilities

Information

Layer 3 Focus: Advanced capability patterns and custom action development

This example shows how to build custom capabilities beyond the built-ins, including external API integrations, database operations, and specialized domain actions.


Action Registry

API Integrations

REST, GraphQL

Database

CRUD, Analytics

Communication

Email, SMS, Slack

File Operations

Read, Write, Process

Computation

Analysis, ML


custom-capabilities.ts
// ==========================================
// CAPABILITY INTERFACE
// ==========================================
interface CapabilityAction {
name: string;
description: string;
category: 'api' | 'database' | 'communication' | 'file' | 'compute';
parameters: {
type: 'object';
properties: Record<string, any>;
required: string[];
};
handler: (params: any) => Promise<any>;
timeout?: number;
retryable?: boolean;
}
class CustomCapabilityRegistry {
private actions: Map<string, CapabilityAction> = new Map();
private executionLog: Array<{ action: string; timestamp: number; duration: number; success: boolean }> = [];
register(action: CapabilityAction): void {
if (this.actions.has(action.name)) {
throw new Error(`Action ${action.name} already registered`);
}
this.actions.set(action.name, action);
console.log(`✅ Registered: ${action.name}`);
}
async execute(name: string, params: any): Promise<any> {
const action = this.actions.get(name);
if (!action) {
throw new Error(`Action ${name} not found`);
}
// Validate parameters
this.validateParameters(action, params);
const startTime = Date.now();
try {
const result = await Promise.race([
action.handler(params),
this.timeout(action.timeout || 30000)
]);
const duration = Date.now() - startTime;
this.logExecution(name, duration, true);
return result;
} catch (error) {
const duration = Date.now() - startTime;
this.logExecution(name, duration, false);
if (action.retryable) {
console.log(`⚠️ Retrying ${name}...`);
return await this.execute(name, params); // Simple retry
}
throw error;
}
}
private validateParameters(action: CapabilityAction, params: any): void {
for (const required of action.parameters.required) {
if (!(required in params)) {
throw new Error(`Missing required parameter: ${required}`);
}
}
for (const [key, value] of Object.entries(params)) {
const paramSchema = action.parameters.properties[key];
if (paramSchema && paramSchema.type) {
const valueType = typeof value;
if (valueType !== paramSchema.type) {
throw new Error(`Parameter ${key} must be ${paramSchema.type}, got ${valueType}`);
}
}
}
}
private logExecution(name: string, duration: number, success: boolean): void {
this.executionLog.push({
action: name,
timestamp: Date.now(),
duration,
success
});
}
private timeout(ms: number): Promise<never> {
return new Promise((_, reject) =>
setTimeout(() => reject(new Error('Action timeout')), ms)
);
}
getExecutionStats(name?: string): any {
const logs = name ? this.executionLog.filter(l => l.action === name) : this.executionLog;
return {
totalExecutions: logs.length,
successful: logs.filter(l => l.success).length,
failed: logs.filter(l => !l.success).length,
averageDuration: logs.reduce((sum, l) => sum + l.duration, 0) / (logs.length || 1),
lastExecution: logs[logs.length - 1]?.timestamp
};
}
}
// ==========================================
// API CAPABILITIES
// ==========================================
class APICapabilities {
static createRESTAction(name: string, method: string, baseUrl: string): CapabilityAction {
return {
name,
description: `Execute ${method} request to ${baseUrl}`,
category: 'api',
parameters: {
type: 'object',
properties: {
endpoint: { type: 'string' },
query: { type: 'object' },
body: { type: 'object' }
},
required: ['endpoint']
},
handler: async (params) => {
const url = new URL(params.endpoint, baseUrl);
if (params.query) {
Object.entries(params.query).forEach(([key, value]) => {
url.searchParams.append(key, String(value));
});
}
const response = await fetch(url.toString(), {
method,
headers: { 'Content-Type': 'application/json' },
body: params.body ? JSON.stringify(params.body) : undefined
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return response.json();
},
timeout: 10000,
retryable: true
};
}
static createWeatherAction(): CapabilityAction {
return {
name: 'get_weather',
description: 'Get weather for a location',
category: 'api',
parameters: {
type: 'object',
properties: {
city: { type: 'string' },
units: { type: 'string', default: 'metric' }
},
required: ['city']
},
handler: async (params) => {
// Mock weather API
return {
city: params.city,
temperature: 22,
condition: 'Sunny',
humidity: 65,
windSpeed: 12
};
}
};
}
static createWebSearchAction(): CapabilityAction {
return {
name: 'web_search',
description: 'Search the web for information',
category: 'api',
parameters: {
type: 'object',
properties: {
query: { type: 'string' },
maxResults: { type: 'number', default: 5 }
},
required: ['query']
},
handler: async (params) => {
// Mock search results
return {
query: params.query,
results: [
{ title: 'Result 1', url: 'https://example.com/1', snippet: 'First result...' },
{ title: 'Result 2', url: 'https://example.com/2', snippet: 'Second result...' }
]
};
}
};
}
}
// ==========================================
// DATABASE CAPABILITIES
// ==========================================
class DatabaseCapabilities {
private mockDb: Map<string, any[]> = new Map();
createQueryAction(): CapabilityAction {
return {
name: 'query_database',
description: 'Execute a database query',
category: 'database',
parameters: {
type: 'object',
properties: {
table: { type: 'string' },
filter: { type: 'object' }
},
required: ['table']
},
handler: async (params) => {
const table = params.table;
const data = this.mockDb.get(table) || [];
if (params.filter) {
return data.filter(item =>
Object.entries(params.filter).every(
([key, value]) => item[key] === value
)
);
}
return data;
}
};
}
createInsertAction(): CapabilityAction {
return {
name: 'insert_record',
description: 'Insert a record into database',
category: 'database',
parameters: {
type: 'object',
properties: {
table: { type: 'string' },
data: { type: 'object' }
},
required: ['table', 'data']
},
handler: async (params) => {
const table = params.table;
if (!this.mockDb.has(table)) {
this.mockDb.set(table, []);
}
const record = {
id: Date.now(),
...params.data,
createdAt: new Date().toISOString()
};
this.mockDb.get(table)!.push(record);
return { id: record.id, success: true };
}
};
}
createUpdateAction(): CapabilityAction {
return {
name: 'update_record',
description: 'Update a record in database',
category: 'database',
parameters: {
type: 'object',
properties: {
table: { type: 'string' },
id: { type: 'number' },
updates: { type: 'object' }
},
required: ['table', 'id', 'updates']
},
handler: async (params) => {
const table = params.table;
const data = this.mockDb.get(table) || [];
const record = data.find(r => r.id === params.id);
if (!record) {
throw new Error(`Record not found: ${params.id}`);
}
Object.assign(record, params.updates, {
updatedAt: new Date().toISOString()
});
return { id: record.id, success: true };
}
};
}
}
// ==========================================
// COMMUNICATION CAPABILITIES
// ==========================================
class CommunicationCapabilities {
static createEmailAction(): CapabilityAction {
return {
name: 'send_email',
description: 'Send an email',
category: 'communication',
parameters: {
type: 'object',
properties: {
to: { type: 'string' },
subject: { type: 'string' },
body: { type: 'string' },
html: { type: 'string' }
},
required: ['to', 'subject', 'body']
},
handler: async (params) => {
console.log(`📧 Email sent to ${params.to}: ${params.subject}`);
return {
messageId: `msg-${Date.now()}`,
sent: true,
recipient: params.to
};
},
timeout: 5000,
retryable: true
};
}
static createSlackAction(webhookUrl: string): CapabilityAction {
return {
name: 'send_slack_message',
description: 'Send a message to Slack',
category: 'communication',
parameters: {
type: 'object',
properties: {
channel: { type: 'string' },
message: { type: 'string' },
blocks: { type: 'object' }
},
required: ['channel', 'message']
},
handler: async (params) => {
console.log(`💬 Slack: ${params.channel} - ${params.message}`);
return {
ts: Date.now() / 1000,
sent: true,
channel: params.channel
};
},
retryable: true
};
}
static createSMSAction(): CapabilityAction {
return {
name: 'send_sms',
description: 'Send an SMS message',
category: 'communication',
parameters: {
type: 'object',
properties: {
phone: { type: 'string' },
message: { type: 'string' }
},
required: ['phone', 'message']
},
handler: async (params) => {
console.log(`📱 SMS to ${params.phone}: ${params.message}`);
return {
sid: `SM${Date.now()}`,
sent: true,
phone: params.phone
};
}
};
}
}
// ==========================================
// FILE CAPABILITIES
// ==========================================
class FileCapabilities {
static createReadFileAction(): CapabilityAction {
return {
name: 'read_file',
description: 'Read a file from disk',
category: 'file',
parameters: {
type: 'object',
properties: {
path: { type: 'string' },
encoding: { type: 'string', default: 'utf-8' }
},
required: ['path']
},
handler: async (params) => {
// Mock file read
return {
path: params.path,
content: 'File content here...',
size: 1024,
encoding: params.encoding
};
}
};
}
static createWriteFileAction(): CapabilityAction {
return {
name: 'write_file',
description: 'Write content to a file',
category: 'file',
parameters: {
type: 'object',
properties: {
path: { type: 'string' },
content: { type: 'string' },
append: { type: 'boolean', default: false }
},
required: ['path', 'content']
},
handler: async (params) => {
console.log(`📝 Writing to ${params.path}`);
return {
path: params.path,
bytesWritten: params.content.length,
success: true
};
}
};
}
static createListFilesAction(): CapabilityAction {
return {
name: 'list_files',
description: 'List files in a directory',
category: 'file',
parameters: {
type: 'object',
properties: {
path: { type: 'string' },
recursive: { type: 'boolean', default: false }
},
required: ['path']
},
handler: async (params) => {
// Mock directory listing
return {
path: params.path,
files: ['file1.txt', 'file2.txt', 'file3.txt'],
count: 3
};
}
};
}
}
// ==========================================
// COMPUTE CAPABILITIES
// ==========================================
class ComputeCapabilities {
static createAnalyticsAction(): CapabilityAction {
return {
name: 'analyze_data',
description: 'Analyze data and generate insights',
category: 'compute',
parameters: {
type: 'object',
properties: {
data: { type: 'object' },
metrics: { type: 'object' }
},
required: ['data']
},
handler: async (params) => {
return {
mean: 42.5,
median: 40,
stdDev: 8.3,
min: 10,
max: 95,
insights: 'Data is normally distributed'
};
}
};
}
static createMLPredictionAction(): CapabilityAction {
return {
name: 'predict',
description: 'Make ML-based prediction',
category: 'compute',
parameters: {
type: 'object',
properties: {
features: { type: 'object' },
model: { type: 'string' }
},
required: ['features', 'model']
},
handler: async (params) => {
return {
prediction: Math.random() > 0.5 ? 'positive' : 'negative',
confidence: 0.87,
model: params.model
};
}
};
}
}
// ==========================================
// USAGE EXAMPLE
// ==========================================
async function main() {
const registry = new CustomCapabilityRegistry();
// Register API capabilities
registry.register(APICapabilities.createWeatherAction());
registry.register(APICapabilities.createWebSearchAction());
// Register database capabilities
const dbCaps = new DatabaseCapabilities();
registry.register(dbCaps.createQueryAction());
registry.register(dbCaps.createInsertAction());
registry.register(dbCaps.createUpdateAction());
// Register communication capabilities
registry.register(CommunicationCapabilities.createEmailAction());
registry.register(CommunicationCapabilities.createSlackAction('https://hooks.slack.com/...'));
registry.register(CommunicationCapabilities.createSMSAction());
// Register file capabilities
registry.register(FileCapabilities.createReadFileAction());
registry.register(FileCapabilities.createWriteFileAction());
registry.register(FileCapabilities.createListFilesAction());
// Register compute capabilities
registry.register(ComputeCapabilities.createAnalyticsAction());
registry.register(ComputeCapabilities.createMLPredictionAction());
// Test executions
console.log('\n=== EXECUTING CAPABILITIES ===\n');
try {
// API
const weather = await registry.execute('get_weather', { city: 'Paris' });
console.log('Weather:', weather);
// Database
const inserted = await registry.execute('insert_record', {
table: 'users',
data: { name: 'Alice', email: 'alice@example.com' }
});
console.log('Inserted:', inserted);
// Communication
const email = await registry.execute('send_email', {
to: 'alice@example.com',
subject: 'Hello',
body: 'Test email'
});
console.log('Email:', email);
// File
const listed = await registry.execute('list_files', { path: '/documents' });
console.log('Files:', listed);
// Compute
const analysis = await registry.execute('analyze_data', {
data: { values: [1, 2, 3, 4, 5] }
});
console.log('Analysis:', analysis);
// Stats
console.log('\n=== EXECUTION STATS ===');
console.log('Overall:', registry.getExecutionStats());
console.log('get_weather:', registry.getExecutionStats('get_weather'));
} catch (error) {
console.error('Error:', error);
}
}
main().catch(console.error);
custom_capabilities.py
from typing import Dict, Any, List, Callable
from datetime import datetime
import asyncio
# ==========================================
# CAPABILITY INTERFACE
# ==========================================
class CapabilityAction:
def __init__(self, name: str, description: str, category: str,
parameters: dict, handler: Callable,
timeout: int = 30000, retryable: bool = False):
self.name = name
self.description = description
self.category = category
self.parameters = parameters
self.handler = handler
self.timeout = timeout
self.retryable = retryable
class CustomCapabilityRegistry:
def __init__(self):
self.actions: Dict[str, CapabilityAction] = {}
self.execution_log = []
def register(self, action: CapabilityAction):
if action.name in self.actions:
raise ValueError(f"Action {action.name} already registered")
self.actions[action.name] = action
print(f"✅ Registered: {action.name}")
async def execute(self, name: str, params: dict) -> Any:
if name not in self.actions:
raise ValueError(f"Action {name} not found")
action = self.actions[name]
# Validate parameters
self._validate_parameters(action, params)
start_time = datetime.now()
try:
# Execute with timeout
result = await asyncio.wait_for(
action.handler(params),
timeout=action.timeout / 1000
)
duration = (datetime.now() - start_time).total_seconds() * 1000
self._log_execution(name, duration, True)
return result
except asyncio.TimeoutError:
duration = (datetime.now() - start_time).total_seconds() * 1000
self._log_execution(name, duration, False)
raise Exception("Action timeout")
except Exception as error:
duration = (datetime.now() - start_time).total_seconds() * 1000
self._log_execution(name, duration, False)
if action.retryable:
print(f"⚠️ Retrying {name}...")
return await self.execute(name, params)
raise
def _validate_parameters(self, action: CapabilityAction, params: dict):
for required in action.parameters.get('required', []):
if required not in params:
raise ValueError(f"Missing required parameter: {required}")
def _log_execution(self, name: str, duration: float, success: bool):
self.execution_log.append({
'action': name,
'timestamp': datetime.now().isoformat(),
'duration': duration,
'success': success
})
def get_execution_stats(self, name: str = None) -> dict:
logs = [l for l in self.execution_log if l['action'] == name] if name else self.execution_log
if not logs:
return {'total_executions': 0}
return {
'total_executions': len(logs),
'successful': sum(1 for l in logs if l['success']),
'failed': sum(1 for l in logs if not l['success']),
'average_duration': sum(l['duration'] for l in logs) / len(logs),
'last_execution': logs[-1]['timestamp']
}
# ==========================================
# API CAPABILITIES
# ==========================================
class APICapabilities:
@staticmethod
def create_weather_action() -> CapabilityAction:
async def handler(params: dict):
return {
'city': params['city'],
'temperature': 22,
'condition': 'Sunny',
'humidity': 65,
'wind_speed': 12
}
return CapabilityAction(
name='get_weather',
description='Get weather for a location',
category='api',
parameters={
'type': 'object',
'properties': {
'city': {'type': 'string'},
'units': {'type': 'string', 'default': 'metric'}
},
'required': ['city']
},
handler=handler
)
@staticmethod
def create_web_search_action() -> CapabilityAction:
async def handler(params: dict):
return {
'query': params['query'],
'results': [
{'title': 'Result 1', 'url': 'https://example.com/1'},
{'title': 'Result 2', 'url': 'https://example.com/2'}
]
}
return CapabilityAction(
name='web_search',
description='Search the web',
category='api',
parameters={
'type': 'object',
'properties': {
'query': {'type': 'string'},
'max_results': {'type': 'number', 'default': 5}
},
'required': ['query']
},
handler=handler
)
# ==========================================
# DATABASE CAPABILITIES
# ==========================================
class DatabaseCapabilities:
def __init__(self):
self.mock_db: Dict[str, List[dict]] = {}
def create_query_action(self) -> CapabilityAction:
async def handler(params: dict):
table = params['table']
data = self.mock_db.get(table, [])
if 'filter' in params:
return [item for item in data if all(
item.get(k) == v for k, v in params['filter'].items()
)]
return data
return CapabilityAction(
name='query_database',
description='Execute a database query',
category='database',
parameters={
'type': 'object',
'properties': {
'table': {'type': 'string'},
'filter': {'type': 'object'}
},
'required': ['table']
},
handler=handler
)
def create_insert_action(self) -> CapabilityAction:
async def handler(params: dict):
table = params['table']
if table not in self.mock_db:
self.mock_db[table] = []
record = {
'id': int(datetime.now().timestamp() * 1000),
**params['data'],
'created_at': datetime.now().isoformat()
}
self.mock_db[table].append(record)
return {'id': record['id'], 'success': True}
return CapabilityAction(
name='insert_record',
description='Insert a record',
category='database',
parameters={
'type': 'object',
'properties': {
'table': {'type': 'string'},
'data': {'type': 'object'}
},
'required': ['table', 'data']
},
handler=handler
)
# ==========================================
# COMMUNICATION CAPABILITIES
# ==========================================
class CommunicationCapabilities:
@staticmethod
def create_email_action() -> CapabilityAction:
async def handler(params: dict):
print(f"📧 Email to {params['to']}: {params['subject']}")
return {
'message_id': f"msg-{int(datetime.now().timestamp() * 1000)}",
'sent': True,
'recipient': params['to']
}
return CapabilityAction(
name='send_email',
description='Send an email',
category='communication',
parameters={
'type': 'object',
'properties': {
'to': {'type': 'string'},
'subject': {'type': 'string'},
'body': {'type': 'string'}
},
'required': ['to', 'subject', 'body']
},
handler=handler,
retryable=True
)
@staticmethod
def create_slack_action() -> CapabilityAction:
async def handler(params: dict):
print(f"💬 Slack: {params['channel']} - {params['message']}")
return {
'ts': datetime.now().timestamp(),
'sent': True,
'channel': params['channel']
}
return CapabilityAction(
name='send_slack_message',
description='Send Slack message',
category='communication',
parameters={
'type': 'object',
'properties': {
'channel': {'type': 'string'},
'message': {'type': 'string'}
},
'required': ['channel', 'message']
},
handler=handler,
retryable=True
)
# ==========================================
# COMPUTE CAPABILITIES
# ==========================================
class ComputeCapabilities:
@staticmethod
def create_analytics_action() -> CapabilityAction:
async def handler(params: dict):
return {
'mean': 42.5,
'median': 40,
'std_dev': 8.3,
'min': 10,
'max': 95,
'insights': 'Data is normally distributed'
}
return CapabilityAction(
name='analyze_data',
description='Analyze data',
category='compute',
parameters={
'type': 'object',
'properties': {
'data': {'type': 'object'}
},
'required': ['data']
},
handler=handler
)
@staticmethod
def create_ml_prediction_action() -> CapabilityAction:
async def handler(params: dict):
import random
return {
'prediction': 'positive' if random.random() > 0.5 else 'negative',
'confidence': 0.87,
'model': params['model']
}
return CapabilityAction(
name='predict',
description='Make ML prediction',
category='compute',
parameters={
'type': 'object',
'properties': {
'features': {'type': 'object'},
'model': {'type': 'string'}
},
'required': ['features', 'model']
},
handler=handler
)
# ==========================================
# USAGE EXAMPLE
# ==========================================
async def main():
registry = CustomCapabilityRegistry()
# Register capabilities
registry.register(APICapabilities.create_weather_action())
registry.register(APICapabilities.create_web_search_action())
db_caps = DatabaseCapabilities()
registry.register(db_caps.create_query_action())
registry.register(db_caps.create_insert_action())
registry.register(CommunicationCapabilities.create_email_action())
registry.register(CommunicationCapabilities.create_slack_action())
registry.register(ComputeCapabilities.create_analytics_action())
registry.register(ComputeCapabilities.create_ml_prediction_action())
# Test executions
print('\n=== EXECUTING CAPABILITIES ===\n')
try:
weather = await registry.execute('get_weather', {'city': 'Paris'})
print('Weather:', weather)
inserted = await registry.execute('insert_record', {
'table': 'users',
'data': {'name': 'Alice', 'email': 'alice@example.com'}
})
print('Inserted:', inserted)
email = await registry.execute('send_email', {
'to': 'alice@example.com',
'subject': 'Hello',
'body': 'Test email'
})
print('Email:', email)
analysis = await registry.execute('analyze_data', {
'data': {'values': [1, 2, 3, 4, 5]}
})
print('Analysis:', analysis)
print('\n=== EXECUTION STATS ===')
print('Overall:', registry.get_execution_stats())
print('get_weather:', registry.get_execution_stats('get_weather'))
except Exception as error:
print(f'Error: {error}')
if __name__ == '__main__':
asyncio.run(main())

✅ Registered: get_weather
✅ Registered: web_search
✅ Registered: query_database
✅ Registered: insert_record
✅ Registered: send_email
✅ Registered: send_slack_message
✅ Registered: analyze_data
✅ Registered: predict
=== EXECUTING CAPABILITIES ===
Weather: {
city: 'Paris',
temperature: 22,
condition: 'Sunny',
humidity: 65,
wind_speed: 12
}
📧 Email to alice@example.com: Hello
Email: {
message_id: 'msg-1736952615000',
sent: true,
recipient: 'alice@example.com'
}
Inserted: {
id: 1736952615000,
success: true
}
💬 Slack: general - Alert: Task completed
Slack: {
ts: 1736952615.000,
sent: true,
channel: 'general'
}
Analysis: {
mean: 42.5,
median: 40,
std_dev: 8.3,
min: 10,
max: 95,
insights: 'Data is normally distributed'
}
=== EXECUTION STATS ===
Overall: {
total_executions: 5,
successful: 5,
failed: 0,
average_duration: 12.4,
last_execution: '2025-01-14T14:30:15Z'
}
get_weather: {
total_executions: 1,
successful: 1,
failed: 0,
average_duration: 5.2,
last_execution: '2025-01-14T14:30:15Z'
}


Parameter Validation

Always validate required parameters and types before execution.

validateParameters(action, params); // Throws on invalid input
Timeout Handling

Set appropriate timeouts for each capability type.

const result = await Promise.race([
action.handler(params),
timeout(action.timeout || 30000)
]);
Retry Logic

Mark idempotent operations as retryable.

if (action.retryable) {
return await this.execute(name, params);
}
Execution Logging

Track all executions for monitoring and debugging.

logExecution(name, duration, success);
getExecutionStats(name); // Get metrics