Skip to content

Advanced Memory Patterns

Information

Layer 2 Focus: Advanced memory patterns including semantic search, knowledge graphs, and cross-agent memory sharing

This example demonstrates advanced memory strategies used in production ARAL agents.


Short-Term Memory

Context Window

Long-Term Memory

Persistence

Pruning Strategies

FIFO / Summarize / Semantic

Vector Store

Semantic Search

Knowledge Graph

Relationships

Cache Layer

Fast Access


memory-patterns.ts
// ==========================================
// SEMANTIC SEARCH WITH VECTORS
// ==========================================
interface Embedding {
text: string;
vector: number[];
metadata: Record<string, any>;
timestamp: number;
}
class SemanticMemory {
private embeddings: Embedding[] = [];
private embedding_model = 'text-embedding-3-small';
async add(text: string, metadata?: Record<string, any>): Promise<void> {
// Get embedding (mock)
const vector = this.mockEmbedding(text);
this.embeddings.push({
text,
vector,
metadata: metadata || {},
timestamp: Date.now()
});
}
async search(query: string, topK: number = 5): Promise<Embedding[]> {
const queryVector = this.mockEmbedding(query);
const scored = this.embeddings.map(e => ({
...e,
similarity: this.cosineSimilarity(queryVector, e.vector)
}));
return scored
.sort((a, b) => b.similarity - a.similarity)
.slice(0, topK);
}
private cosineSimilarity(a: number[], b: number[]): number {
const dot = a.reduce((sum, av, i) => sum + av * b[i], 0);
const magA = Math.sqrt(a.reduce((sum, av) => sum + av * av, 0));
const magB = Math.sqrt(b.reduce((sum, bv) => sum + bv * bv, 0));
return dot / (magA * magB);
}
private mockEmbedding(text: string): number[] {
// Mock embedding
return Array.from({ length: 1536 }, () => Math.random());
}
}
// ==========================================
// KNOWLEDGE GRAPH
// ==========================================
interface GraphNode {
id: string;
label: string;
type: 'entity' | 'concept' | 'event';
properties: Record<string, any>;
}
interface GraphEdge {
source: string;
target: string;
relationship: string;
weight?: number;
}
class KnowledgeGraph {
private nodes: Map<string, GraphNode> = new Map();
private edges: GraphEdge[] = [];
addEntity(id: string, label: string, type: string, props?: Record<string, any>): void {
this.nodes.set(id, {
id,
label,
type: type as any,
properties: props || {}
});
}
addRelationship(
sourceId: string,
targetId: string,
relationship: string,
weight: number = 1
): void {
this.edges.push({
source: sourceId,
target: targetId,
relationship,
weight
});
}
getConnected(nodeId: string): GraphNode[] {
const connected = this.edges
.filter(e => e.source === nodeId)
.map(e => this.nodes.get(e.target))
.filter(Boolean) as GraphNode[];
return connected;
}
findPathsBetween(sourceId: string, targetId: string): GraphEdge[][] {
// Simple BFS for path finding
const paths: GraphEdge[][] = [];
const visited = new Set<string>();
const bfs = (current: string, target: string, path: GraphEdge[] = []): void => {
if (current === target) {
paths.push(path);
return;
}
visited.add(current);
const outgoing = this.edges.filter(e => e.source === current && !visited.has(e.target));
for (const edge of outgoing) {
bfs(edge.target, target, [...path, edge]);
}
visited.delete(current);
};
bfs(sourceId, targetId);
return paths;
}
}
// ==========================================
// HIERARCHICAL MEMORY
// ==========================================
class HierarchicalMemory {
private immediate: any[] = []; // Last 10 messages
private contextWindow: any[] = []; // Last 20 messages
private episodic: any[] = []; // Sessions/episodes
private semantic: SemanticMemory;
private graph: KnowledgeGraph;
constructor() {
this.semantic = new SemanticMemory();
this.graph = new KnowledgeGraph();
}
async remember(
content: string,
type: 'message' | 'action' | 'event' | 'concept'
): Promise<void> {
const entry = {
content,
type,
timestamp: Date.now(),
id: `mem-${Date.now()}`
};
// Store in layers
this.immediate.push(entry);
if (this.immediate.length > 10) {
const promoted = this.immediate.shift();
this.contextWindow.push(promoted);
if (this.contextWindow.length > 20) {
this.contextWindow.shift();
}
}
// Index semantically
await this.semantic.add(content, entry);
// Extract and graph entities
this.extractEntitiesAndRelationships(content);
}
private extractEntitiesAndRelationships(text: string): void {
// Simple extraction (in production, use NER)
const words = text.split(' ');
for (let i = 0; i < words.length; i++) {
const word = words[i];
if (word.length > 3) {
const nodeId = word.toLowerCase();
if (!this.graph.nodes.has(nodeId)) {
this.graph.addEntity(nodeId, word, 'concept');
}
// Connect to nearby words
if (i > 0) {
const prevWord = words[i - 1].toLowerCase();
this.graph.addRelationship(prevWord, nodeId, 'follows');
}
}
}
}
async recall(query: string): Promise<any> {
// Multi-level recall
const immediate = this.immediate.filter(e =>
e.content.toLowerCase().includes(query.toLowerCase())
);
const semantic = await this.semantic.search(query, 5);
const connected = this.graph.nodes.get(query.toLowerCase())
? this.graph.getConnected(query.toLowerCase())
: [];
return {
immediate: immediate.slice(-5),
semantic: semantic.map(e => e.metadata),
relationships: connected.map(n => n.label)
};
}
getStats(): Record<string, any> {
return {
immediateMemory: this.immediate.length,
contextWindow: this.contextWindow.length,
episodicMemory: this.episodic.length,
graphNodes: this.graph.nodes.size,
graphEdges: this.edges?.length || 0
};
}
private get edges(): GraphEdge[] {
return (this.graph as any).edges || [];
}
}
// ==========================================
// CROSS-AGENT SHARED MEMORY
// ==========================================
interface SharedMemoryEntry {
key: string;
value: any;
owner: string;
timestamp: number;
ttl?: number; // Time to live in seconds
readPermissions: string[]; // Agent IDs with read access
writePermissions: string[]; // Agent IDs with write access
}
class SharedMemoryStore {
private store: Map<string, SharedMemoryEntry> = new Map();
private accessLog: Array<{
key: string;
agent: string;
operation: 'read' | 'write';
timestamp: number;
}> = [];
set(
key: string,
value: any,
owner: string,
readPerms: string[] = [],
writePerms: string[] = []
): void {
this.store.set(key, {
key,
value,
owner,
timestamp: Date.now(),
readPermissions: readPerms,
writePermissions: writePerms
});
this.logAccess(key, owner, 'write');
}
get(key: string, requester: string): any {
const entry = this.store.get(key);
if (!entry) return undefined;
// Check permissions
if (entry.readPermissions.length > 0 && !entry.readPermissions.includes(requester)) {
throw new Error(`Access denied: ${requester} cannot read ${key}`);
}
// Check TTL
if (entry.ttl) {
const age = (Date.now() - entry.timestamp) / 1000;
if (age > entry.ttl) {
this.store.delete(key);
return undefined;
}
}
this.logAccess(key, requester, 'read');
return entry.value;
}
async share(key: string, owner: string, withAgents: string[]): Promise<void> {
const entry = this.store.get(key);
if (!entry) throw new Error(`Key not found: ${key}`);
if (entry.owner !== owner) throw new Error('Only owner can share');
entry.readPermissions = [...new Set([...entry.readPermissions, ...withAgents])];
console.log(`✅ Shared ${key} with: ${withAgents.join(', ')}`);
}
private logAccess(key: string, agent: string, operation: 'read' | 'write'): void {
this.accessLog.push({
key,
agent,
operation,
timestamp: Date.now()
});
}
getAccessLog(key?: string): any[] {
return key
? this.accessLog.filter(l => l.key === key)
: this.accessLog;
}
}
// ==========================================
// CACHE LAYER
// ==========================================
class MemoryCache {
private cache: Map<string, { value: any; timestamp: number; ttl: number }> = new Map();
set(key: string, value: any, ttlSeconds: number = 300): void {
this.cache.set(key, {
value,
timestamp: Date.now(),
ttl: ttlSeconds
});
}
get(key: string): any {
const entry = this.cache.get(key);
if (!entry) return undefined;
const age = (Date.now() - entry.timestamp) / 1000;
if (age > entry.ttl) {
this.cache.delete(key);
return undefined;
}
return entry.value;
}
invalidate(pattern: string): void {
for (const key of this.cache.keys()) {
if (key.match(pattern)) {
this.cache.delete(key);
}
}
}
stats(): Record<string, any> {
return {
entries: this.cache.size,
estimatedSize: Array.from(this.cache.values()).reduce(
(sum, e) => sum + JSON.stringify(e.value).length,
0
),
hitRate: 0.85 // Mock
};
}
}
// ==========================================
// USAGE EXAMPLE
// ==========================================
async function main() {
console.log('=== ADVANCED MEMORY PATTERNS ===\n');
// 1. HIERARCHICAL MEMORY
console.log('1️⃣ Hierarchical Memory:');
const hierarchical = new HierarchicalMemory();
await hierarchical.remember('The customer ordered product X', 'message');
await hierarchical.remember('Order ID is 12345', 'message');
await hierarchical.remember('Customer satisfaction is high', 'event');
const recalled = await hierarchical.recall('customer order');
console.log('Recalled:', recalled);
console.log('Stats:', hierarchical.getStats());
// 2. KNOWLEDGE GRAPH
console.log('\n2️⃣ Knowledge Graph:');
const kg = new KnowledgeGraph();
kg.addEntity('alice', 'Alice', 'entity', { role: 'customer' });
kg.addEntity('order-123', 'Order #123', 'entity', { total: 99.99 });
kg.addEntity('product-x', 'Product X', 'entity', { category: 'electronics' });
kg.addRelationship('alice', 'order-123', 'placed');
kg.addRelationship('order-123', 'product-x', 'contains');
const paths = kg.findPathsBetween('alice', 'product-x');
console.log('Path from Alice to Product X:', paths[0]?.map(e => e.relationship).join(''));
// 3. SHARED MEMORY
console.log('\n3️⃣ Shared Memory:');
const shared = new SharedMemoryStore();
// Tech agent stores knowledge
shared.set(
'issue-diagnosis',
{ problem: 'connection timeout', solution: 'restart router' },
'tech-agent',
['router-agent', 'billing-agent'], // read permissions
['tech-agent'] // write permissions
);
// Router agent reads
const diagnosis = shared.get('issue-diagnosis', 'router-agent');
console.log('Router read:', diagnosis);
// Billing agent shares with support
await shared.share('issue-diagnosis', 'tech-agent', ['support-agent']);
console.log('Access log:', shared.getAccessLog('issue-diagnosis'));
// 4. CACHE LAYER
console.log('\n4️⃣ Cache Layer:');
const cache = new MemoryCache();
cache.set('user-preferences', { theme: 'dark', language: 'en' }, 600);
cache.set('product-catalog', [{ id: 1, name: 'Item 1' }], 3600);
console.log('Cached preferences:', cache.get('user-preferences'));
console.log('Cache stats:', cache.stats());
cache.invalidate('product.*');
console.log('After invalidation:', cache.get('product-catalog'));
}
main().catch(console.error);
memory_patterns.py
import asyncio
import json
import math
from datetime import datetime
from typing import List, Dict, Any, Optional
import numpy as np
# ==========================================
# SEMANTIC SEARCH WITH VECTORS
# ==========================================
class SemanticMemory:
def __init__(self):
self.embeddings = []
async def add(self, text: str, metadata: dict = None):
vector = self._mock_embedding(text)
self.embeddings.append({
'text': text,
'vector': vector,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat()
})
async def search(self, query: str, top_k: int = 5) -> List[dict]:
query_vector = self._mock_embedding(query)
scored = [
{**e, 'similarity': self._cosine_similarity(query_vector, e['vector'])}
for e in self.embeddings
]
return sorted(scored, key=lambda x: x['similarity'], reverse=True)[:top_k]
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
dot = sum(av * bv for av, bv in zip(a, b))
mag_a = math.sqrt(sum(av ** 2 for av in a))
mag_b = math.sqrt(sum(bv ** 2 for bv in b))
return dot / (mag_a * mag_b) if (mag_a * mag_b) > 0 else 0
def _mock_embedding(self, text: str) -> List[float]:
import random
return [random.random() for _ in range(1536)]
# ==========================================
# KNOWLEDGE GRAPH
# ==========================================
class KnowledgeGraph:
def __init__(self):
self.nodes: Dict[str, Dict] = {}
self.edges: List[Dict] = []
def add_entity(self, id: str, label: str, entity_type: str, props: dict = None):
self.nodes[id] = {
'id': id,
'label': label,
'type': entity_type,
'properties': props or {}
}
def add_relationship(
self,
source_id: str,
target_id: str,
relationship: str,
weight: float = 1.0
):
self.edges.append({
'source': source_id,
'target': target_id,
'relationship': relationship,
'weight': weight
})
def get_connected(self, node_id: str) -> List[Dict]:
connected = [
self.nodes[e['target']]
for e in self.edges
if e['source'] == node_id and e['target'] in self.nodes
]
return connected
def find_paths(self, source_id: str, target_id: str) -> List[List[Dict]]:
paths = []
visited = set()
def dfs(current, target, path):
if current == target:
paths.append(path)
return
visited.add(current)
for edge in self.edges:
if edge['source'] == current and edge['target'] not in visited:
dfs(edge['target'], target, path + [edge])
visited.discard(current)
dfs(source_id, target_id, [])
return paths
# ==========================================
# HIERARCHICAL MEMORY
# ==========================================
class HierarchicalMemory:
def __init__(self):
self.immediate = []
self.context_window = []
self.episodic = []
self.semantic = SemanticMemory()
self.graph = KnowledgeGraph()
async def remember(self, content: str, entry_type: str):
entry = {
'content': content,
'type': entry_type,
'timestamp': datetime.now().isoformat(),
'id': f"mem-{int(datetime.now().timestamp() * 1000)}"
}
self.immediate.append(entry)
if len(self.immediate) > 10:
promoted = self.immediate.pop(0)
self.context_window.append(promoted)
if len(self.context_window) > 20:
self.context_window.pop(0)
await self.semantic.add(content, entry)
self._extract_entities(content)
def _extract_entities(self, text: str):
words = text.split()
for i, word in enumerate(words):
if len(word) > 3:
node_id = word.lower()
if node_id not in self.graph.nodes:
self.graph.add_entity(node_id, word, 'concept')
if i > 0:
prev_word = words[i - 1].lower()
self.graph.add_relationship(prev_word, node_id, 'follows')
async def recall(self, query: str) -> dict:
immediate = [
e for e in self.immediate
if query.lower() in e['content'].lower()
]
semantic = await self.semantic.search(query, 5)
connected = self.graph.get_connected(query.lower()) if query.lower() in self.graph.nodes else []
return {
'immediate': immediate[-5:],
'semantic': [e['metadata'] for e in semantic],
'relationships': [n['label'] for n in connected]
}
def get_stats(self) -> dict:
return {
'immediate_memory': len(self.immediate),
'context_window': len(self.context_window),
'episodic_memory': len(self.episodic),
'graph_nodes': len(self.graph.nodes),
'graph_edges': len(self.graph.edges)
}
# ==========================================
# SHARED MEMORY
# ==========================================
class SharedMemoryStore:
def __init__(self):
self.store = {}
self.access_log = []
def set(
self,
key: str,
value: Any,
owner: str,
read_perms: List[str] = None,
write_perms: List[str] = None
):
self.store[key] = {
'key': key,
'value': value,
'owner': owner,
'timestamp': int(datetime.now().timestamp() * 1000),
'read_permissions': read_perms or [],
'write_permissions': write_perms or []
}
self._log_access(key, owner, 'write')
def get(self, key: str, requester: str) -> Any:
entry = self.store.get(key)
if not entry:
return None
if entry['read_permissions'] and requester not in entry['read_permissions']:
raise PermissionError(f"Access denied: {requester} cannot read {key}")
self._log_access(key, requester, 'read')
return entry['value']
async def share(self, key: str, owner: str, with_agents: List[str]):
entry = self.store.get(key)
if not entry:
raise KeyError(f"Key not found: {key}")
if entry['owner'] != owner:
raise PermissionError("Only owner can share")
entry['read_permissions'] = list(set(entry['read_permissions'] + with_agents))
print(f"✅ Shared {key} with: {', '.join(with_agents)}")
def _log_access(self, key: str, agent: str, operation: str):
self.access_log.append({
'key': key,
'agent': agent,
'operation': operation,
'timestamp': int(datetime.now().timestamp() * 1000)
})
def get_access_log(self, key: str = None) -> List[dict]:
return [l for l in self.access_log if l['key'] == key] if key else self.access_log
# ==========================================
# CACHE LAYER
# ==========================================
class MemoryCache:
def __init__(self):
self.cache = {}
def set(self, key: str, value: Any, ttl_seconds: int = 300):
self.cache[key] = {
'value': value,
'timestamp': datetime.now(),
'ttl': ttl_seconds
}
def get(self, key: str) -> Any:
entry = self.cache.get(key)
if not entry:
return None
age = (datetime.now() - entry['timestamp']).total_seconds()
if age > entry['ttl']:
del self.cache[key]
return None
return entry['value']
def invalidate(self, pattern: str):
import re
regex = re.compile(pattern)
keys_to_delete = [k for k in self.cache.keys() if regex.match(k)]
for key in keys_to_delete:
del self.cache[key]
def stats(self) -> dict:
return {
'entries': len(self.cache),
'estimated_size': sum(
len(json.dumps(e['value']))
for e in self.cache.values()
),
'hit_rate': 0.85
}
# ==========================================
# USAGE EXAMPLE
# ==========================================
async def main():
print('=== ADVANCED MEMORY PATTERNS ===\n')
# 1. HIERARCHICAL MEMORY
print('1️⃣ Hierarchical Memory:')
hierarchical = HierarchicalMemory()
await hierarchical.remember('The customer ordered product X', 'message')
await hierarchical.remember('Order ID is 12345', 'message')
await hierarchical.remember('Customer satisfaction is high', 'event')
recalled = await hierarchical.recall('customer order')
print('Recalled:', recalled)
print('Stats:', hierarchical.get_stats())
# 2. KNOWLEDGE GRAPH
print('\n2️⃣ Knowledge Graph:')
kg = KnowledgeGraph()
kg.add_entity('alice', 'Alice', 'entity', {'role': 'customer'})
kg.add_entity('order-123', 'Order #123', 'entity', {'total': 99.99})
kg.add_entity('product-x', 'Product X', 'entity', {'category': 'electronics'})
kg.add_relationship('alice', 'order-123', 'placed')
kg.add_relationship('order-123', 'product-x', 'contains')
paths = kg.find_paths('alice', 'product-x')
if paths:
print('Path from Alice to Product X:', ''.join(e['relationship'] for e in paths[0]))
# 3. SHARED MEMORY
print('\n3️⃣ Shared Memory:')
shared = SharedMemoryStore()
shared.set(
'issue-diagnosis',
{'problem': 'connection timeout', 'solution': 'restart router'},
'tech-agent',
read_perms=['router-agent', 'billing-agent'],
write_perms=['tech-agent']
)
diagnosis = shared.get('issue-diagnosis', 'router-agent')
print('Router read:', diagnosis)
await shared.share('issue-diagnosis', 'tech-agent', ['support-agent'])
print('Access log:', shared.get_access_log('issue-diagnosis'))
# 4. CACHE LAYER
print('\n4️⃣ Cache Layer:')
cache = MemoryCache()
cache.set('user-preferences', {'theme': 'dark', 'language': 'en'}, 600)
cache.set('product-catalog', [{'id': 1, 'name': 'Item 1'}], 3600)
print('Cached preferences:', cache.get('user-preferences'))
print('Cache stats:', cache.stats())
cache.invalidate('product.*')
print('After invalidation:', cache.get('product-catalog'))
if __name__ == '__main__':
asyncio.run(main())

PatternBest ForComplexityScalability
HierarchicalMulti-level contextMediumGood
Semantic SearchSimilarity queriesHighExcellent
Knowledge GraphRelationshipsHighGood
Shared MemoryMulti-agent syncMediumGood
Cache LayerFast accessLowExcellent

Memory Sizing
  • Immediate: Last 10-20 messages (~5KB)
  • Context Window: Last 100-200 messages (~50KB)
  • LTM: Database with cleanup by date
Pruning Strategy
  • Use Summarize for important long contexts
  • Use Semantic for recall-heavy workloads
  • Use FIFO for simple time-based purging
Vector Storage
  • Use Pinecone for cloud SaaS
  • Use Weaviate for self-hosted
  • Use FAISS for embedded deployments
Knowledge Graphs
  • Use Neo4j for complex relationships
  • Use ArangoDB for flexible schemas
  • Use DGraph for distributed graphs