Advanced Memory Patterns
Information
Layer 2 Focus: Advanced memory patterns including semantic search, knowledge graphs, and cross-agent memory sharing
This example demonstrates advanced memory strategies used in production ARAL agents.
Memory Pattern Types
Section titled “Memory Pattern Types”Complete Implementation
Section titled “Complete Implementation”// ==========================================// SEMANTIC SEARCH WITH VECTORS// ==========================================interface Embedding { text: string; vector: number[]; metadata: Record<string, any>; timestamp: number;}
class SemanticMemory { private embeddings: Embedding[] = []; private embedding_model = 'text-embedding-3-small';
async add(text: string, metadata?: Record<string, any>): Promise<void> { // Get embedding (mock) const vector = this.mockEmbedding(text);
this.embeddings.push({ text, vector, metadata: metadata || {}, timestamp: Date.now() }); }
async search(query: string, topK: number = 5): Promise<Embedding[]> { const queryVector = this.mockEmbedding(query); const scored = this.embeddings.map(e => ({ ...e, similarity: this.cosineSimilarity(queryVector, e.vector) }));
return scored .sort((a, b) => b.similarity - a.similarity) .slice(0, topK); }
private cosineSimilarity(a: number[], b: number[]): number { const dot = a.reduce((sum, av, i) => sum + av * b[i], 0); const magA = Math.sqrt(a.reduce((sum, av) => sum + av * av, 0)); const magB = Math.sqrt(b.reduce((sum, bv) => sum + bv * bv, 0)); return dot / (magA * magB); }
private mockEmbedding(text: string): number[] { // Mock embedding return Array.from({ length: 1536 }, () => Math.random()); }}
// ==========================================// KNOWLEDGE GRAPH// ==========================================interface GraphNode { id: string; label: string; type: 'entity' | 'concept' | 'event'; properties: Record<string, any>;}
interface GraphEdge { source: string; target: string; relationship: string; weight?: number;}
class KnowledgeGraph { private nodes: Map<string, GraphNode> = new Map(); private edges: GraphEdge[] = [];
addEntity(id: string, label: string, type: string, props?: Record<string, any>): void { this.nodes.set(id, { id, label, type: type as any, properties: props || {} }); }
addRelationship( sourceId: string, targetId: string, relationship: string, weight: number = 1 ): void { this.edges.push({ source: sourceId, target: targetId, relationship, weight }); }
getConnected(nodeId: string): GraphNode[] { const connected = this.edges .filter(e => e.source === nodeId) .map(e => this.nodes.get(e.target)) .filter(Boolean) as GraphNode[];
return connected; }
findPathsBetween(sourceId: string, targetId: string): GraphEdge[][] { // Simple BFS for path finding const paths: GraphEdge[][] = []; const visited = new Set<string>();
const bfs = (current: string, target: string, path: GraphEdge[] = []): void => { if (current === target) { paths.push(path); return; }
visited.add(current);
const outgoing = this.edges.filter(e => e.source === current && !visited.has(e.target)); for (const edge of outgoing) { bfs(edge.target, target, [...path, edge]); }
visited.delete(current); };
bfs(sourceId, targetId); return paths; }}
// ==========================================// HIERARCHICAL MEMORY// ==========================================class HierarchicalMemory { private immediate: any[] = []; // Last 10 messages private contextWindow: any[] = []; // Last 20 messages private episodic: any[] = []; // Sessions/episodes private semantic: SemanticMemory; private graph: KnowledgeGraph;
constructor() { this.semantic = new SemanticMemory(); this.graph = new KnowledgeGraph(); }
async remember( content: string, type: 'message' | 'action' | 'event' | 'concept' ): Promise<void> { const entry = { content, type, timestamp: Date.now(), id: `mem-${Date.now()}` };
// Store in layers this.immediate.push(entry); if (this.immediate.length > 10) { const promoted = this.immediate.shift(); this.contextWindow.push(promoted); if (this.contextWindow.length > 20) { this.contextWindow.shift(); } }
// Index semantically await this.semantic.add(content, entry);
// Extract and graph entities this.extractEntitiesAndRelationships(content); }
private extractEntitiesAndRelationships(text: string): void { // Simple extraction (in production, use NER) const words = text.split(' ');
for (let i = 0; i < words.length; i++) { const word = words[i]; if (word.length > 3) { const nodeId = word.toLowerCase(); if (!this.graph.nodes.has(nodeId)) { this.graph.addEntity(nodeId, word, 'concept'); }
// Connect to nearby words if (i > 0) { const prevWord = words[i - 1].toLowerCase(); this.graph.addRelationship(prevWord, nodeId, 'follows'); } } } }
async recall(query: string): Promise<any> { // Multi-level recall const immediate = this.immediate.filter(e => e.content.toLowerCase().includes(query.toLowerCase()) );
const semantic = await this.semantic.search(query, 5); const connected = this.graph.nodes.get(query.toLowerCase()) ? this.graph.getConnected(query.toLowerCase()) : [];
return { immediate: immediate.slice(-5), semantic: semantic.map(e => e.metadata), relationships: connected.map(n => n.label) }; }
getStats(): Record<string, any> { return { immediateMemory: this.immediate.length, contextWindow: this.contextWindow.length, episodicMemory: this.episodic.length, graphNodes: this.graph.nodes.size, graphEdges: this.edges?.length || 0 }; }
private get edges(): GraphEdge[] { return (this.graph as any).edges || []; }}
// ==========================================// CROSS-AGENT SHARED MEMORY// ==========================================interface SharedMemoryEntry { key: string; value: any; owner: string; timestamp: number; ttl?: number; // Time to live in seconds readPermissions: string[]; // Agent IDs with read access writePermissions: string[]; // Agent IDs with write access}
class SharedMemoryStore { private store: Map<string, SharedMemoryEntry> = new Map(); private accessLog: Array<{ key: string; agent: string; operation: 'read' | 'write'; timestamp: number; }> = [];
set( key: string, value: any, owner: string, readPerms: string[] = [], writePerms: string[] = [] ): void { this.store.set(key, { key, value, owner, timestamp: Date.now(), readPermissions: readPerms, writePermissions: writePerms });
this.logAccess(key, owner, 'write'); }
get(key: string, requester: string): any { const entry = this.store.get(key);
if (!entry) return undefined;
// Check permissions if (entry.readPermissions.length > 0 && !entry.readPermissions.includes(requester)) { throw new Error(`Access denied: ${requester} cannot read ${key}`); }
// Check TTL if (entry.ttl) { const age = (Date.now() - entry.timestamp) / 1000; if (age > entry.ttl) { this.store.delete(key); return undefined; } }
this.logAccess(key, requester, 'read'); return entry.value; }
async share(key: string, owner: string, withAgents: string[]): Promise<void> { const entry = this.store.get(key); if (!entry) throw new Error(`Key not found: ${key}`); if (entry.owner !== owner) throw new Error('Only owner can share');
entry.readPermissions = [...new Set([...entry.readPermissions, ...withAgents])]; console.log(`✅ Shared ${key} with: ${withAgents.join(', ')}`); }
private logAccess(key: string, agent: string, operation: 'read' | 'write'): void { this.accessLog.push({ key, agent, operation, timestamp: Date.now() }); }
getAccessLog(key?: string): any[] { return key ? this.accessLog.filter(l => l.key === key) : this.accessLog; }}
// ==========================================// CACHE LAYER// ==========================================class MemoryCache { private cache: Map<string, { value: any; timestamp: number; ttl: number }> = new Map();
set(key: string, value: any, ttlSeconds: number = 300): void { this.cache.set(key, { value, timestamp: Date.now(), ttl: ttlSeconds }); }
get(key: string): any { const entry = this.cache.get(key); if (!entry) return undefined;
const age = (Date.now() - entry.timestamp) / 1000; if (age > entry.ttl) { this.cache.delete(key); return undefined; }
return entry.value; }
invalidate(pattern: string): void { for (const key of this.cache.keys()) { if (key.match(pattern)) { this.cache.delete(key); } } }
stats(): Record<string, any> { return { entries: this.cache.size, estimatedSize: Array.from(this.cache.values()).reduce( (sum, e) => sum + JSON.stringify(e.value).length, 0 ), hitRate: 0.85 // Mock }; }}
// ==========================================// USAGE EXAMPLE// ==========================================async function main() { console.log('=== ADVANCED MEMORY PATTERNS ===\n');
// 1. HIERARCHICAL MEMORY console.log('1️⃣ Hierarchical Memory:'); const hierarchical = new HierarchicalMemory();
await hierarchical.remember('The customer ordered product X', 'message'); await hierarchical.remember('Order ID is 12345', 'message'); await hierarchical.remember('Customer satisfaction is high', 'event');
const recalled = await hierarchical.recall('customer order'); console.log('Recalled:', recalled); console.log('Stats:', hierarchical.getStats());
// 2. KNOWLEDGE GRAPH console.log('\n2️⃣ Knowledge Graph:'); const kg = new KnowledgeGraph();
kg.addEntity('alice', 'Alice', 'entity', { role: 'customer' }); kg.addEntity('order-123', 'Order #123', 'entity', { total: 99.99 }); kg.addEntity('product-x', 'Product X', 'entity', { category: 'electronics' });
kg.addRelationship('alice', 'order-123', 'placed'); kg.addRelationship('order-123', 'product-x', 'contains');
const paths = kg.findPathsBetween('alice', 'product-x'); console.log('Path from Alice to Product X:', paths[0]?.map(e => e.relationship).join(' → '));
// 3. SHARED MEMORY console.log('\n3️⃣ Shared Memory:'); const shared = new SharedMemoryStore();
// Tech agent stores knowledge shared.set( 'issue-diagnosis', { problem: 'connection timeout', solution: 'restart router' }, 'tech-agent', ['router-agent', 'billing-agent'], // read permissions ['tech-agent'] // write permissions );
// Router agent reads const diagnosis = shared.get('issue-diagnosis', 'router-agent'); console.log('Router read:', diagnosis);
// Billing agent shares with support await shared.share('issue-diagnosis', 'tech-agent', ['support-agent']); console.log('Access log:', shared.getAccessLog('issue-diagnosis'));
// 4. CACHE LAYER console.log('\n4️⃣ Cache Layer:'); const cache = new MemoryCache();
cache.set('user-preferences', { theme: 'dark', language: 'en' }, 600); cache.set('product-catalog', [{ id: 1, name: 'Item 1' }], 3600);
console.log('Cached preferences:', cache.get('user-preferences')); console.log('Cache stats:', cache.stats());
cache.invalidate('product.*'); console.log('After invalidation:', cache.get('product-catalog'));}
main().catch(console.error);import asyncioimport jsonimport mathfrom datetime import datetimefrom typing import List, Dict, Any, Optionalimport numpy as np
# ==========================================# SEMANTIC SEARCH WITH VECTORS# ==========================================class SemanticMemory: def __init__(self): self.embeddings = []
async def add(self, text: str, metadata: dict = None): vector = self._mock_embedding(text)
self.embeddings.append({ 'text': text, 'vector': vector, 'metadata': metadata or {}, 'timestamp': datetime.now().isoformat() })
async def search(self, query: str, top_k: int = 5) -> List[dict]: query_vector = self._mock_embedding(query)
scored = [ {**e, 'similarity': self._cosine_similarity(query_vector, e['vector'])} for e in self.embeddings ]
return sorted(scored, key=lambda x: x['similarity'], reverse=True)[:top_k]
def _cosine_similarity(self, a: List[float], b: List[float]) -> float: dot = sum(av * bv for av, bv in zip(a, b)) mag_a = math.sqrt(sum(av ** 2 for av in a)) mag_b = math.sqrt(sum(bv ** 2 for bv in b)) return dot / (mag_a * mag_b) if (mag_a * mag_b) > 0 else 0
def _mock_embedding(self, text: str) -> List[float]: import random return [random.random() for _ in range(1536)]
# ==========================================# KNOWLEDGE GRAPH# ==========================================class KnowledgeGraph: def __init__(self): self.nodes: Dict[str, Dict] = {} self.edges: List[Dict] = []
def add_entity(self, id: str, label: str, entity_type: str, props: dict = None): self.nodes[id] = { 'id': id, 'label': label, 'type': entity_type, 'properties': props or {} }
def add_relationship( self, source_id: str, target_id: str, relationship: str, weight: float = 1.0 ): self.edges.append({ 'source': source_id, 'target': target_id, 'relationship': relationship, 'weight': weight })
def get_connected(self, node_id: str) -> List[Dict]: connected = [ self.nodes[e['target']] for e in self.edges if e['source'] == node_id and e['target'] in self.nodes ] return connected
def find_paths(self, source_id: str, target_id: str) -> List[List[Dict]]: paths = [] visited = set()
def dfs(current, target, path): if current == target: paths.append(path) return
visited.add(current)
for edge in self.edges: if edge['source'] == current and edge['target'] not in visited: dfs(edge['target'], target, path + [edge])
visited.discard(current)
dfs(source_id, target_id, []) return paths
# ==========================================# HIERARCHICAL MEMORY# ==========================================class HierarchicalMemory: def __init__(self): self.immediate = [] self.context_window = [] self.episodic = [] self.semantic = SemanticMemory() self.graph = KnowledgeGraph()
async def remember(self, content: str, entry_type: str): entry = { 'content': content, 'type': entry_type, 'timestamp': datetime.now().isoformat(), 'id': f"mem-{int(datetime.now().timestamp() * 1000)}" }
self.immediate.append(entry) if len(self.immediate) > 10: promoted = self.immediate.pop(0) self.context_window.append(promoted) if len(self.context_window) > 20: self.context_window.pop(0)
await self.semantic.add(content, entry) self._extract_entities(content)
def _extract_entities(self, text: str): words = text.split() for i, word in enumerate(words): if len(word) > 3: node_id = word.lower() if node_id not in self.graph.nodes: self.graph.add_entity(node_id, word, 'concept')
if i > 0: prev_word = words[i - 1].lower() self.graph.add_relationship(prev_word, node_id, 'follows')
async def recall(self, query: str) -> dict: immediate = [ e for e in self.immediate if query.lower() in e['content'].lower() ]
semantic = await self.semantic.search(query, 5) connected = self.graph.get_connected(query.lower()) if query.lower() in self.graph.nodes else []
return { 'immediate': immediate[-5:], 'semantic': [e['metadata'] for e in semantic], 'relationships': [n['label'] for n in connected] }
def get_stats(self) -> dict: return { 'immediate_memory': len(self.immediate), 'context_window': len(self.context_window), 'episodic_memory': len(self.episodic), 'graph_nodes': len(self.graph.nodes), 'graph_edges': len(self.graph.edges) }
# ==========================================# SHARED MEMORY# ==========================================class SharedMemoryStore: def __init__(self): self.store = {} self.access_log = []
def set( self, key: str, value: Any, owner: str, read_perms: List[str] = None, write_perms: List[str] = None ): self.store[key] = { 'key': key, 'value': value, 'owner': owner, 'timestamp': int(datetime.now().timestamp() * 1000), 'read_permissions': read_perms or [], 'write_permissions': write_perms or [] } self._log_access(key, owner, 'write')
def get(self, key: str, requester: str) -> Any: entry = self.store.get(key) if not entry: return None
if entry['read_permissions'] and requester not in entry['read_permissions']: raise PermissionError(f"Access denied: {requester} cannot read {key}")
self._log_access(key, requester, 'read') return entry['value']
async def share(self, key: str, owner: str, with_agents: List[str]): entry = self.store.get(key) if not entry: raise KeyError(f"Key not found: {key}") if entry['owner'] != owner: raise PermissionError("Only owner can share")
entry['read_permissions'] = list(set(entry['read_permissions'] + with_agents)) print(f"✅ Shared {key} with: {', '.join(with_agents)}")
def _log_access(self, key: str, agent: str, operation: str): self.access_log.append({ 'key': key, 'agent': agent, 'operation': operation, 'timestamp': int(datetime.now().timestamp() * 1000) })
def get_access_log(self, key: str = None) -> List[dict]: return [l for l in self.access_log if l['key'] == key] if key else self.access_log
# ==========================================# CACHE LAYER# ==========================================class MemoryCache: def __init__(self): self.cache = {}
def set(self, key: str, value: Any, ttl_seconds: int = 300): self.cache[key] = { 'value': value, 'timestamp': datetime.now(), 'ttl': ttl_seconds }
def get(self, key: str) -> Any: entry = self.cache.get(key) if not entry: return None
age = (datetime.now() - entry['timestamp']).total_seconds() if age > entry['ttl']: del self.cache[key] return None
return entry['value']
def invalidate(self, pattern: str): import re regex = re.compile(pattern) keys_to_delete = [k for k in self.cache.keys() if regex.match(k)] for key in keys_to_delete: del self.cache[key]
def stats(self) -> dict: return { 'entries': len(self.cache), 'estimated_size': sum( len(json.dumps(e['value'])) for e in self.cache.values() ), 'hit_rate': 0.85 }
# ==========================================# USAGE EXAMPLE# ==========================================async def main(): print('=== ADVANCED MEMORY PATTERNS ===\n')
# 1. HIERARCHICAL MEMORY print('1️⃣ Hierarchical Memory:') hierarchical = HierarchicalMemory()
await hierarchical.remember('The customer ordered product X', 'message') await hierarchical.remember('Order ID is 12345', 'message') await hierarchical.remember('Customer satisfaction is high', 'event')
recalled = await hierarchical.recall('customer order') print('Recalled:', recalled) print('Stats:', hierarchical.get_stats())
# 2. KNOWLEDGE GRAPH print('\n2️⃣ Knowledge Graph:') kg = KnowledgeGraph()
kg.add_entity('alice', 'Alice', 'entity', {'role': 'customer'}) kg.add_entity('order-123', 'Order #123', 'entity', {'total': 99.99}) kg.add_entity('product-x', 'Product X', 'entity', {'category': 'electronics'})
kg.add_relationship('alice', 'order-123', 'placed') kg.add_relationship('order-123', 'product-x', 'contains')
paths = kg.find_paths('alice', 'product-x') if paths: print('Path from Alice to Product X:', ' → '.join(e['relationship'] for e in paths[0]))
# 3. SHARED MEMORY print('\n3️⃣ Shared Memory:') shared = SharedMemoryStore()
shared.set( 'issue-diagnosis', {'problem': 'connection timeout', 'solution': 'restart router'}, 'tech-agent', read_perms=['router-agent', 'billing-agent'], write_perms=['tech-agent'] )
diagnosis = shared.get('issue-diagnosis', 'router-agent') print('Router read:', diagnosis)
await shared.share('issue-diagnosis', 'tech-agent', ['support-agent']) print('Access log:', shared.get_access_log('issue-diagnosis'))
# 4. CACHE LAYER print('\n4️⃣ Cache Layer:') cache = MemoryCache()
cache.set('user-preferences', {'theme': 'dark', 'language': 'en'}, 600) cache.set('product-catalog', [{'id': 1, 'name': 'Item 1'}], 3600)
print('Cached preferences:', cache.get('user-preferences')) print('Cache stats:', cache.stats())
cache.invalidate('product.*') print('After invalidation:', cache.get('product-catalog'))
if __name__ == '__main__': asyncio.run(main())Pattern Comparison
Section titled “Pattern Comparison”| Pattern | Best For | Complexity | Scalability |
|---|---|---|---|
| Hierarchical | Multi-level context | Medium | Good |
| Semantic Search | Similarity queries | High | Excellent |
| Knowledge Graph | Relationships | High | Good |
| Shared Memory | Multi-agent sync | Medium | Good |
| Cache Layer | Fast access | Low | Excellent |
Production Recommendations
Section titled “Production Recommendations”Memory Sizing
- Immediate: Last 10-20 messages (~5KB)
- Context Window: Last 100-200 messages (~50KB)
- LTM: Database with cleanup by date
Pruning Strategy
- Use Summarize for important long contexts
- Use Semantic for recall-heavy workloads
- Use FIFO for simple time-based purging
Vector Storage
- Use Pinecone for cloud SaaS
- Use Weaviate for self-hosted
- Use FAISS for embedded deployments
Knowledge Graphs
- Use Neo4j for complex relationships
- Use ArangoDB for flexible schemas
- Use DGraph for distributed graphs