Skillssession-compression
S

session-compression

AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context management.

bobmatnyc
8 stars
1.2k downloads
Updated 1w ago

Readme

session-compression follows the SKILL.md standard. Use the install command to add it to your agent stack.

---
name: session-compression
description: AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context management.
progressive_disclosure:
  entry_point:
    - summary
    - when_to_use
    - quick_start
  full_content:
    - all
---

# AI Session Compression Techniques

## Summary

**Compress long AI conversations to fit context windows while preserving critical information.**

Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.

**Key Benefits:**
- **Cost Reduction:** 80-90% token cost savings through hierarchical memory
- **Performance:** 2x faster responses with compressed context
- **Scalability:** Handle conversations exceeding 1M tokens
- **Quality:** Preserve critical information with <2% accuracy loss

## When to Use

**Use session compression when:**
- Multi-turn conversations approach context window limits (>50% capacity)
- Long-running chat sessions (customer support, tutoring, code assistants)
- Token costs become significant (high-volume applications)
- Response latency increases due to large context
- Managing conversation history across multiple sessions

**Don't use when:**
- Short conversations (<10 turns) fitting easily in context
- Every detail must be preserved verbatim (legal, compliance)
- Single-turn or stateless interactions
- Context window usage is <30%

**Ideal scenarios:**
- Chatbots with 50+ turn conversations
- AI code assistants tracking long development sessions
- Customer support with multi-session ticket history
- Educational tutors with student progress tracking
- Multi-day collaborative AI workflows

## Quick Start

### Basic Setup with LangChain

```python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic

# Initialize Claude client
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",
    api_key="your-api-key"
)

# Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding this
    return_messages=True
)

# Add conversation turns
memory.save_context(
    {"input": "What's session compression?"},
    {"output": "Session compression reduces conversation token usage..."}
)

# Retrieve compressed context
context = memory.load_memory_variables({})
```

### Progressive Compression Pattern

```python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:
    def __init__(self, thresholds=[0.70, 0.85, 0.95]):
        self.thresholds = thresholds
        self.messages = []
        self.max_tokens = 200000  # Claude context window

    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})

        # Check if compression needed
        current_usage = self._estimate_tokens()
        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:
            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):
        for i, threshold in enumerate(self.thresholds):
            if ratio < threshold:
                return i
        return len(self.thresholds)

    def _compress(self, level: int):
        """Apply compression based on severity level."""
        if level == 1:  # 70% threshold: Light compression
            self._remove_redundant_messages()
        elif level == 2:  # 85% threshold: Medium compression
            self._summarize_old_messages(keep_recent=10)
        else:  # 95% threshold: Aggressive compression
            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):
        """Remove duplicate or low-value messages."""
        # Implementation: Use semantic deduplication
        pass

    def _summarize_old_messages(self, keep_recent: int):
        """Summarize older messages, keep recent ones verbatim."""
        if len(self.messages) <= keep_recent:
            return

        # Messages to summarize
        to_summarize = self.messages[:-keep_recent]
        recent = self.messages[-keep_recent:]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in to_summarize
        ])

        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Replace old messages with summary
        summary = {
            "role": "system",
            "content": f"[Summary]\n{response.content[0].text}"
        }
        self.messages = [summary] + recent

# Usage
compressor = ProgressiveCompressor()

for i in range(100):
    compressor.add_message("user", f"Message {i}")
    compressor.add_message("assistant", f"Response {i}")
```

### Using Anthropic Prompt Caching (90% Cost Reduction)

```python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Build context with cache control
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "Long conversation context here...",
                "cache_control": {"type": "ephemeral"}  # Cache this
            }
        ]
    },
    {
        "role": "assistant",
        "content": "Previous response..."
    },
    {
        "role": "user",
        "content": "New question"  # Not cached, changes frequently
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Cache hit reduces costs by 90% for cached content
```

---

## Core Concepts

### Context Windows and Token Limits

**Context window:** Maximum tokens an LLM can process in a single request (input + output).

**Current limits (2025):**
- Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
- GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
- Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)

**Token estimation:**
- English: ~4 characters per token
- Code: ~3 characters per token
- Rule of thumb: 1 token ≈ 0.75 words

**Why compression matters:**
- **Cost:** Claude Sonnet costs $3/$15 per 1M input/output tokens
- **Latency:** Larger contexts increase processing time
- **Quality:** Excessive context can dilute attention on relevant information

### Compression Ratios

**Compression ratio = Original tokens / Compressed tokens**

**Industry benchmarks:**
- Extractive summarization: 2-3x
- Abstractive summarization: 5-10x
- Hierarchical summarization: 20x+
- LLMLingua (prompt compression): 20x with 1.5% accuracy loss
- KVzip (KV cache compression): 3-4x with 2x speed improvement

**Target ratios by use case:**
- Customer support: 5-7x (preserve details)
- General chat: 8-12x (balance quality/efficiency)
- Code assistants: 3-5x (preserve technical accuracy)
- Long documents: 15-20x (extract key insights)

### Progressive Compression Thresholds

**Industry standard pattern:**

```
Context Usage    Action                     Technique
─────────────────────────────────────────────────────────
0-70%           No compression             Store verbatim
70-85%          Light compression          Remove redundancy
85-95%          Medium compression         Summarize old messages
95-100%         Aggressive compression     Hierarchical + RAG
```

**Implementation guidelines:**
- **70% threshold:** Remove duplicate/redundant messages, semantic deduplication
- **85% threshold:** Summarize messages older than 20 turns, keep recent 10-15
- **95% threshold:** Multi-level hierarchical summarization + vector store archival
- **Emergency (100%):** Drop least important messages, aggressive summarization

---

## Compression Techniques

### 1. Summarization Techniques

#### 1.1 Extractive Summarization

**Selects key sentences/phrases without modification.**

**Pros:** No hallucination, fast, deterministic
**Cons:** Limited compression (2-3x), may feel disjointed
**Best for:** Legal/compliance, short-term compression

```python
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):
    """Extract most important messages using TF-IDF scoring."""
    texts = [msg['content'] for msg in messages]

    # Calculate TF-IDF scores
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(texts)
    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # Select top messages
    n_keep = max(1, int(len(messages) * compression_ratio))
    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]
```

#### 1.2 Abstractive Summarization

**Uses LLMs to semantically condense conversation history.**

**Pros:** Higher compression (5-10x), coherent, synthesizes information
**Cons:** Risk of hallucination, higher cost, less deterministic
**Best for:** General chat, customer support, multi-session continuity

```python
from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):
    """Generate semantic summary using Claude."""
    conversation_text = "\n\n".join([
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    ])

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items

Conversation:
{conversation_text}

Summary (aim for 1/5 the original length):"""
        }]
    )

    return {
        "role": "assistant",
        "content": f"[Summary]\n{response.content[0].text}"
    }
```

#### 1.3 Hierarchical Summarization (Multi-Level)

**Creates summaries of summaries in a tree structure.**

**Pros:** Extreme compression (20x+), handles 1M+ token conversations
**Cons:** Complex implementation, multiple LLM calls, information loss accumulates
**Best for:** Long-running conversations, multi-session applications

**Architecture:**
```
Level 0 (Raw):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]
Level 2 (Group):  [Summary1-4]              [Summary5-8]
Level 3 (Session): [Overall Session Summary]
```

```python
from anthropic import Anthropic
from typing import List, Dict

class HierarchicalMemory:
    def __init__(self, client: Anthropic, chunk_size: int = 10):
        self.client = client
        self.chunk_size = chunk_size
        self.levels: List[List[Dict]] = [[]]  # Level 0 = raw messages

    def add_message(self, message: Dict):
        """Add message and trigger summarization if needed."""
        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:
            self._summarize_level(0)

    def _summarize_level(self, level: int):
        """Summarize a level into the next higher level."""
        messages = self.levels[level]

        # Ensure next level exists
        while len(self.levels) <= level + 1:
            self.levels.append([])

        # Summarize first chunk
        chunk = messages[:self.chunk_size]
        summary = self._generate_summary(chunk, level)

        # Move to next level
        self.levels[level + 1].append(summary)
        self.levels[level] = messages[self.chunk_size:]

        # Recursively check if next level needs summarization
        if len(self.levels[level + 1]) >= self.chunk_size * 2:
            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
        """Generate summary for a chunk."""
        conversation_text = "\n\n".join([
            f"{msg['role'].upper()}: {msg['content']}"
            for msg in messages
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
            }]
        )

        return {
            "role": "system",
            "content": f"[L{level+1} Summary] {response.content[0].text}",
            "level": level + 1
        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:
        """Retrieve context within token budget."""
        context = []
        token_count = 0

        # Prioritize recent raw messages
        for msg in reversed(self.levels[0]):
            msg_tokens = len(msg['content']) // 4
            if token_count + msg_tokens > max_tokens * 0.6:
                break
            context.insert(0, msg)
            token_count += msg_tokens

        # Add summaries from higher levels
        for level in range(1, len(self.levels)):
            for summary in self.levels[level]:
                summary_tokens = len(summary['content']) // 4
                if token_count + summary_tokens > max_tokens:
                    break
                context.insert(0, summary)
                token_count += summary_tokens

        return context
```

**Academic reference:** "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)

#### 1.4 Rolling Summarization (Continuous)

**Continuously compresses conversation with sliding window.**

**Pros:** Low latency, predictable token usage, simple
**Cons:** Early details over-compressed, no information recovery
**Best for:** Real-time chat, streaming conversations

```python
from anthropic import Anthropic

class RollingMemory:
    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
        self.client = client
        self.window_size = window_size
        self.compress_threshold = compress_threshold
        self.rolling_summary = None
        self.recent_messages = []

    def add_message(self, message: dict):
        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:
            self._compress()

    def _compress(self):
        """Compress older messages into rolling summary."""
        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []
        if self.rolling_summary:
            parts.append(f"Existing summary:\n{self.rolling_summary}")

        parts.append("\nNew messages:\n" + "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages_to_compress
        ]))

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": "\n".join(parts) + "\n\nUpdate the summary:"
            }]
        )

        self.rolling_summary = response.content[0].text
        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):
        context = []
        if self.rolling_summary:
            context.append({
                "role": "system",
                "content": f"[Summary]\n{self.rolling_summary}"
            })
        context.extend(self.recent_messages)
        return context
```

### 2. Embedding-Based Approaches

#### 2.1 RAG (Retrieval-Augmented Generation)

**Store full conversation in vector database, retrieve only relevant chunks.**

**Pros:** Extremely scalable, no information loss, high relevance
**Cons:** Requires vector DB infrastructure, retrieval latency
**Best for:** Knowledge bases, customer support with large history

```python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class RAGMemory:
    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Initialize vector store
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(
            name="conversation",
            metadata={"hnsw:space": "cosine"}
        )

        self.recent_messages = []
        self.recent_window = 5
        self.message_counter = 0

    def add_message(self, message: dict):
        """Add to recent memory and vector store."""
        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:
            old_msg = self.recent_messages.pop(0)
            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):
        """Archive to vector database."""
        # Generate embedding
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{"role": message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):
        """Retrieve relevant context using RAG."""
        context = []
        token_count = 0

        # 1. Recent messages (short-term memory)
        for msg in self.recent_messages:
            context.append(msg)
            token_count += len(msg['content']) // 4

        # 2. Retrieve relevant historical context
        if token_count < max_tokens:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )

            n_results = min(10, (max_tokens - token_count) // 100)
            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=n_results
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens:
                    break

                metadata = results['metadatas'][0][i]
                context.insert(0, {
                    "role": metadata['role'],
                    "content": f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        return context
```

**Vector database options:**
- **ChromaDB:** Embedded, easy local development
- **Pinecone:** Managed, 50ms p95 latency
- **Weaviate:** Open-source, hybrid search
- **Qdrant:** High performance, payload filtering

#### 2.2 Vector Search and Clustering

**Group similar messages into clusters, represent with centroids.**

**Pros:** Reduces redundancy, identifies themes, multi-topic handling
**Cons:** Requires sufficient data, may lose nuances
**Best for:** Multi-topic conversations, meeting summaries

```python
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np

class ClusteredMemory:
    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
        self.client = openai_client
        self.n_clusters = n_clusters
        self.messages = []
        self.embeddings = []

    def add_messages(self, messages: list):
        for msg in messages:
            self.messages.append(msg)

            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):
        """Cluster messages and return representatives."""
        if len(self.messages) < self.n_clusters:
            return self.messages

        embeddings_array = np.array(self.embeddings)
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings_array)

        # Select message closest to each centroid
        compressed = []
        for cluster_id in range(self.n_clusters):
            cluster_indices = np.where(labels == cluster_id)[0]
            centroid = kmeans.cluster_centers_[cluster_id]
            cluster_embeddings = embeddings_array[cluster_indices]
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({
                **self.messages[closest_idx],
                "cluster_id": int(cluster_id),
                "cluster_size": len(cluster_indices)
            })

        return compressed
```

#### 2.3 Semantic Deduplication

**Remove semantically similar messages that convey redundant information.**

**Pros:** Reduces redundancy without losing unique content
**Cons:** Requires threshold tuning, O(n²) complexity
**Best for:** FAQ systems, repetitive conversations

```python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:
    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
        self.client = openai_client
        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):
        """Remove semantically similar messages."""
        if len(messages) <= 1:
            return messages

        # Generate embeddings
        embeddings = []
        for msg in messages:
            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)
        similarity_matrix = cosine_similarity(embeddings_array)

        # Mark unique messages
        keep_indices = []
        for i in range(len(messages)):
            is_unique = True
            for j in keep_indices:
                if similarity_matrix[i][j] > self.threshold:
                    is_unique = False
                    break

            if is_unique:
                keep_indices.append(i)

        return [messages[i] for i in keep_indices]
```

### 3. Token-Efficient Strategies

#### 3.1 Message Prioritization

**Assign importance scores and retain only high-priority content.**

**Pros:** Retains most important information, flexible criteria
**Cons:** Scoring is heuristic-based, may break flow
**Best for:** Mixed-importance conversations, filtering noise

```python
import re

class MessagePrioritizer:
    def score_message(self, msg: dict, index: int, total: int) -> float:
        """Calculate composite importance score."""
        scores = []

        # Length score (longer = more info)
        scores.append(min(len(msg['content']) / 500, 1.0))

        # Question score
        if msg['role'] == 'user':
            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # Entity score (capitalized words)
        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
        scores.append(min(entities / 10, 1.0))

        # Recency score (linear decay)
        scores.append(index / max(total - 1, 1))

        # Role score
        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):
        """Select top N messages by priority."""
        scored = [
            (msg, self.score_message(msg, i, len(messages)), i)
            for i, msg in enumerate(messages)
        ]

        scored.sort(key=lambda x: x[1], reverse=True)
        top_messages = scored[:target_count]
        top_messages.sort(key=lambda x: x[2])  # Restore chronological order

        return [msg for msg, score, idx in top_messages]
```

#### 3.2 Delta Compression

**Store only changes between consecutive messages.**

**Pros:** Highly efficient for incremental changes
**Cons:** Reconstruction overhead, not suitable for all content
**Best for:** Code assistants with incremental edits

```python
import difflib

class DeltaCompressor:
    def __init__(self):
        self.base_messages = []
        self.deltas = []

    def add_message(self, message: dict):
        if not self.base_messages:
            self.base_messages.append(message)
            return

        # Find most similar previous message
        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:
            # Calculate delta
            diff = list(difflib.unified_diff(
                last_msg['content'].splitlines(),
                message['content'].splitlines(),
                lineterm=''
            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:
                # Store as delta if compression achieved
                self.deltas.append({
                    'base_index': len(self.base_messages) - 1,
                    'delta': diff,
                    'role': message['role']
                })
                return

        # Store as new base message
        self.base_messages.append(message)

    def reconstruct(self):
        """Reconstruct full conversation from bases + deltas."""
        messages = self.base_messages.copy()

        for delta_info in self.deltas:
            base_content = messages[delta_info['base_index']]['content']
            # Apply diff to reconstruct (simplified)
            reconstructed = base_content  # Full implementation would apply diff
            messages.append({
                'role': delta_info['role'],
                'content': reconstructed
            })

        return messages
```

### 4. LangChain Memory Types

#### 4.1 ConversationSummaryMemory

**Automatically summarizes conversation as it progresses.**

```python
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)

# Add conversation
memory.save_context(
    {"input": "Hi, I'm working on a Python project"},
    {"output": "Great! How can I help with your Python project?"}
)

# Get summary
summary = memory.load_memory_variables({})
print(summary['history'])
```

**Pros:** Automatic summarization, simple API
**Cons:** Every turn triggers LLM call
**Best for:** Medium conversations (20-50 turns)

#### 4.2 ConversationSummaryBufferMemory

**Hybrid: Recent messages verbatim, older summarized.**

```python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding
    return_messages=True
)

# Add conversation
for i in range(50):
    memory.save_context(
        {"input": f"Question {i}"},
        {"output": f"Answer {i}"}
    )

# Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})
```

**Pros:** Best balance of detail and compression
**Cons:** Requires token limit tuning
**Best for:** Most production applications

#### 4.3 ConversationTokenBufferMemory

**Maintains fixed token budget, drops oldest when exceeded.**

```python
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)

# Simple FIFO when token limit exceeded
```

**Pros:** Predictable token usage, simple
**Cons:** Loses old information completely
**Best for:** Real-time chat with strict limits

#### 4.4 VectorStoreRetrieverMemory

**Stores all messages in vector database, retrieves relevant ones.**

```python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

# Automatically retrieves most relevant context
```

**Pros:** Infinite conversation length, semantic retrieval
**Cons:** Requires vector DB, retrieval overhead
**Best for:** Long-running conversations, knowledge bases

### 5. Anthropic-Specific Patterns

#### 5.1 Prompt Caching (90% Cost Reduction)

**Cache static context to reduce token costs.**

```python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Long conversation context
conversation_history = [
    {"role": "user", "content": "Message 1"},
    {"role": "assistant", "content": "Response 1"},
    # ... many more messages
]

# Mark context for caching
messages = []
for i, msg in enumerate(conversation_history[:-1]):
    content = msg['content']

    # Add cache control to last context message
    if i == len(conversation_history) - 2:
        messages.append({
            "role": msg['role'],
            "content": [
                {
                    "type": "text",
                    "text": content,
                    "cache_control": {"type": "ephemeral"}
                }
            ]
        })
    else:
        messages.append(msg)

# Add new user message (not cached)
messages.append(conversation_history[-1])

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Subsequent calls with same cached context cost 90% less
```

**Cache TTL:** 5 minutes
**Savings:** 90% cost reduction for cached tokens
**Limits:** Max 4 cache breakpoints per request
**Best practices:**
- Cache conversation history, not current query
- Update cache when context changes significantly
- Combine with summarization for maximum efficiency

#### 5.2 Extended Thinking for Compression Planning

**Use extended thinking to plan optimal compression strategy.**

```python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": f"""Analyze this conversation and recommend compression:

{conversation_text}

Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x

Recommend optimal strategy."""
    }]
)

# Access thinking process
thinking_content = [
    block for block in response.content
    if block.type == "thinking"
]

# Get compression recommendation
recommendation = response.content[-1].text
```

---

## Production Patterns

### Checkpointing and Persistence

**Save compression state for recovery and resume.**

```python
import json
import pickle
from pathlib import Path

class PersistentMemory:
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.memory = []
        self.summary = None

    def save_checkpoint(self, session_id: str):
        """Save current memory state."""
        checkpoint = {
            'messages': self.memory,
            'summary': self.summary,
            'timestamp': time.time()
        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):
        """Load memory state from checkpoint."""
        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            self.memory = checkpoint['messages']
            self.summary = checkpoint.get('summary')
            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):
        """Automatically save every N messages."""
        if len(self.memory) % interval == 0:
            self.save_checkpoint(session_id)
```

### Resume Workflows

**Continue conversations across sessions.**

```python
from anthropic import Anthropic
import json

class ResumableConversation:
    def __init__(self, client: Anthropic, session_id: str):
        self.client = client
        self.session_id = session_id
        self.memory = self._load_or_create()

    def _load_or_create(self):
        """Load existing session or create new."""
        try:
            with open(f'sessions/{self.session_id}.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {
                'messages': [],
                'summary': None,
                'created_at': time.time()
            }

    def add_turn(self, user_message: str):
        """Add user message and get response."""
        # Add user message
        self.memory['messages'].append({
            'role': 'user',
            'content': user_message
        })

        # Build context (with compression)
        context = self._build_context()

        # Get response
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=context + [{
                'role': 'user',
                'content': user_message
            }]
        )

        # Save response
        assistant_message = response.content[0].text
        self.memory['messages'].append({
            'role': 'assistant',
            'content': assistant_message
        })

        # Compress if needed
        if len(self.memory['messages']) > 20:
            self._compress()

        # Save state
        self._save()

        return assistant_message

    def _build_context(self):
        """Build context with compression."""
        context = []

        # Add summary if exists
        if self.memory['summary']:
            context.append({
                'role': 'system',
                'content': f"[Previous conversation summary]\n{self.memory['summary']}"
            })

        # Add recent messages
        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):
        """Compress older messages."""
        if len(self.memory['messages']) < 15:
            return

        # Messages to summarize
        to_summarize = self.memory['messages'][:-10]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in to_summarize
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                'role': 'user',
                'content': f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Update memory
        self.memory['summary'] = response.content[0].text
        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):
        """Save session to disk."""
        with open(f'sessions/{self.session_id}.json', 'w') as f:
            json.dump(self.memory, f, indent=2)

# Usage
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")

# Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
# ... later session
response2 = conversation.add_turn("Show me an example")  # Remembers context
```

### Hybrid Approaches (Best Practice)

**Combine multiple techniques for optimal results.**

```python
from anthropic import Anthropic
from openai import OpenAI
import chromadb

class HybridMemorySystem:
    """
    Combines:
    - Rolling summarization (short-term compression)
    - RAG retrieval (long-term memory)
    - Prompt caching (cost optimization)
    - Progressive compression (adaptive behavior)
    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Recent messages (verbatim)
        self.recent_messages = []
        self.recent_window = 10

        # Rolling summary
        self.rolling_summary = None

        # Vector store (long-term)
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(name="memory")
        self.message_counter = 0

        # Compression thresholds
        self.thresholds = {
            'light': 0.70,    # Start basic compression
            'medium': 0.85,   # Aggressive summarization
            'heavy': 0.95     # Emergency measures
        }

    def add_message(self, message: dict):
        """Add message with intelligent compression."""
        self.recent_messages.append(message)

        # Check compression needs
        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:
            self._emergency_compress()
        elif usage_ratio >= self.thresholds['medium']:
            self._medium_compress()
        elif usage_ratio >= self.thresholds['light']:
            self._light_compress()

    def _light_compress(self):
        """Remove redundancy, archive to vector store."""
        if len(self.recent_messages) > self.recent_window * 1.5:
            # Archive oldest to vector store
            to_archive = self.recent_messages[:5]
            for msg in to_archive:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):
        """Generate rolling summary, aggressive archival."""
        if len(self.recent_messages) > self.recent_window:
            # Summarize older messages
            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([
                f"{msg['role']}: {msg['content']}"
                for msg in to_summarize
            ])

            if self.rolling_summary:
                summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"

            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=400,
                messages=[{
                    'role': 'user',
                    'content': f"Update summary:\n{summary_text}"
                }]
            )

            self.rolling_summary = response.content[0].text

            # Archive all summarized messages
            for msg in to_summarize:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):
        """Extreme compression for near-limit situations."""
        # Keep only 5 most recent messages
        to_archive = self.recent_messages[:-5]
        for msg in to_archive:
            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # Compress summary further if needed
        if self.rolling_summary and len(self.rolling_summary) > 1000:
            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=200,
                messages=[{
                    'role': 'user',
                    'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
                }]
            )
            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):
        """Store in vector database for retrieval."""
        embedding_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[embedding_response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{'role': message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):
        """Build optimal context for current query."""
        context = []
        token_count = 0

        # 1. Add rolling summary (if exists)
        if self.rolling_summary:
            summary_msg = {
                'role': 'system',
                'content': [
                    {
                        'type': 'text',
                        'text': f"[Conversation Summary]\n{self.rolling_summary}",
                        'cache_control': {'type': 'ephemeral'}  # Cache it
                    }
                ]
            }
            context.append(summary_msg)
            token_count += len(self.rolling_summary) // 4

        # 2. Retrieve relevant historical context (RAG)
        if token_count < max_tokens * 0.3:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=current_query
            )

            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=5
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens * 0.3:
                    break

                metadata = results['metadatas'][0][i]
                context.append({
                    'role': metadata['role'],
                    'content': f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        # 3. Add recent messages verbatim
        for msg in self.recent_messages:
            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
                break
            context.append(msg)
            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):
        """Estimate current context window usage."""
        total_tokens = 0

        if self.rolling_summary:
            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:
            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet context window

# Usage
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")

memory = HybridMemorySystem(anthropic_client, openai_client)

# Add messages over time
for i in range(1000):
    memory.add_message({
        'role': 'user' if i % 2 == 0 else 'assistant',
        'content': f"Message {i} with some content..."
    })

# Retrieve optimized context
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)

# Use with Claude
response = anthropic_client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=context + [{
        'role': 'user',
        'content': current_query
    }]
)
```

---

## Performance Benchmarks

### Compression Efficiency

| Technique | Compression Ratio | Quality Loss | Latency | Cost Impact |
|-----------|------------------|--------------|---------|-------------|
| Extractive | 2-3x | <1% | <10ms | None |
| Abstractive | 5-10x | 2-5% | 1-2s | +$0.001/turn |
| Hierarchical | 20x+ | 5-8% | 2-5s | +$0.003/turn |
| LLMLingua | 20x | 1.5% | 500ms | None |
| RAG | Variable | <1% | 100-300ms | +$0.0005/turn |
| Prompt Caching | N/A | 0% | 0ms | -90% |

### Token Savings by Use Case

**Customer Support (50-turn conversation):**
- No compression: ~8,000 tokens/request
- Rolling summary: ~2,000 tokens/request (75% reduction)
- Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)

**Code Assistant (100-turn session):**
- No compression: ~25,000 tokens/request
- Hierarchical: ~5,000 tokens/request (80% reduction)
- Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)

**Educational Tutor (multi-session):**
- No compression: Would exceed context window
- RAG + summarization: ~3,000 tokens/request
- Infinite session length enabled

### Cost Analysis

**Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)**

**1,000 conversations, 50 turns each:**
- **No compression:**
  - Avg 8K tokens/request × 50K requests = 400M tokens
  - Cost: $1,200

- **With rolling summarization:**
  - Avg 2K tokens/request × 50K requests = 100M tokens
  - Summarization overhead: +10M tokens
  - Cost: $330 (72% savings)

- **With hybrid system + caching:**
  - First turn: 2K tokens (no cache)
  - Subsequent: 200 tokens effective (90% cache hit)
  - Total: ~15M tokens effective
  - Cost: $45 (96% savings)

---

## Tool Recommendations

### Memory Management Tools

#### Mem0 (Recommended for Production)

**Best for:** Hybrid memory systems with minimal code

```python
from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")

# Automatically handles compression, summarization, RAG
memory = client.create_memory(
    user_id="user123",
    messages=[
        {"role": "user", "content": "I'm working on a Python project"},
        {"role": "assistant", "content": "Great! What kind of project?"}
    ]
)

# Retrieve relevant context
context = client.get_memory(
    user_id="user123",
    query="What programming language am I using?"
)
```

**Features:**
- Automatic hierarchical summarization
- Built-in RAG retrieval
- Multi-user session management
- Analytics dashboard

**Pricing:** $0.40/1K memory operations

#### Zep

**Best for:** Low-latency production deployments**

```python
from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")

# Add to session
client.memory.add_memory(
    session_id="session123",
    messages=[
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi there!"}
    ]
)

# Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")
```

**Features:**
- <100ms retrieval latency
- Automatic fact extraction
- Entity recognition
- Session management

**Pricing:** Open-source (self-hosted) or $0.50/1K operations (cloud)

#### ChromaDB

**Best for:** Self-hosted vector storage**

```python
import chromadb

client = chromadb.Client()
collection = client.create_collection("conversations")

# Store embeddings
collection.add(
    documents=["Message content"],
    embeddings=[[0.1, 0.2, ...]],
    ids=["msg1"]
)

# Retrieve
results = collection.query(
    query_embeddings=[[0.1, 0.2, ...]],
    n_results=5
)
```

**Features:**
- Fully open-source
- Embedded or client-server
- Fast local development

**Pricing:** Free (self-hosted)

#### LangChain

**Best for:** Rapid prototyping and experimentation**

```python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
```

**Features:**
- Multiple memory types
- Framework integration
- Extensive documentation

**Pricing:** Free (uses your LLM API costs)

### Compression Libraries

#### LLMLingua

**Best for:** Extreme compression with minimal quality loss**

```python
from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(
    context="Long conversation history...",
    instruction="Current user query",
    target_token=500
)

# Achieves 20x compression with 1.5% accuracy loss
```

**Features:**
- 20x compression ratios
- <2% quality degradation
- Fast inference (<500ms)

**Pricing:** Free (open-source)

---

## Use Cases and Patterns

### Chatbot (Customer Support)

**Requirements:**
- Multi-turn conversations (50-100 turns)
- Preserve customer context
- Fast response times
- Cost-efficient

**Recommended approach:**
- ConversationSummaryBufferMemory (LangChain)
- 70% threshold: Semantic deduplication
- 85% threshold: Rolling summarization
- Prompt caching for frequent patterns

**Implementation:**

```python
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,
    return_messages=True
)

# Add customer conversation
for turn in customer_conversation:
    memory.save_context(
        {"input": turn['customer_message']},
        {"output": turn['agent_response']}
    )

# Retrieve compressed context
context = memory.load_memory_variables({})
```

### Code Assistant

**Requirements:**
- Long development sessions (100+ turns)
- Preserve technical details
- Handle large code blocks
- Track incremental changes

**Recommended approach:**
- Hierarchical summarization for overall context
- RAG retrieval for specific code references
- Delta compression for iterative edits
- Prompt caching for system prompts

**Implementation:**

```python
from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class CodeAssistantMemory:
    def __init__(self):
        self.hierarchy = HierarchicalMemory(client, chunk_size=15)
        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):
        # Store in hierarchy
        self.hierarchy.add_message({
            'role': 'user',
            'content': code_change['description']
        })

        # Store in RAG for retrieval
        self.rag.add_message(code_change)

        # Store as delta if incremental
        if code_change.get('is_incremental'):
            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):
        # Combine hierarchical summary + RAG retrieval
        summary_context = self.hierarchy.get_context(max_tokens=2000)
        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context
```

### Educational Tutor

**Requirements:**
- Multi-session tracking
- Student progress persistence
- Personalized context retrieval
- Long-term knowledge retention

**Recommended approach:**
- VectorStoreRetrieverMemory for multi-session
- Fact extraction for student knowledge
- Progressive compression across sessions
- Resumable conversations

**Implementation:**

```python
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class TutorMemory:
    def __init__(self, student_id: str):
        self.student_id = student_id

        # Vector store for all sessions
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(
            collection_name=f"student_{student_id}",
            embedding_function=embeddings
        )

        self.memory = VectorStoreRetrieverMemory(
            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
        )

    def add_lesson_content(self, lesson: dict):
        """Add lesson interaction to student memory."""
        self.memory.save_context(
            {"input": lesson['topic']},
            {"output": lesson['explanation']}
        )

    def get_student_context(self, current_topic: str):
        """Retrieve relevant past lessons for current topic."""
        return self.memory.load_memory_variables({
            "prompt": current_topic
        })
```

---

## Best Practices

### 1. Choose the Right Technique for Your Use Case

- **Short conversations (<20 turns):** No compression needed
- **Medium conversations (20-50 turns):** ConversationSummaryBufferMemory
- **Long conversations (50-100 turns):** Hierarchical or rolling summarization
- **Very long (100+ turns):** Hybrid (RAG + summarization + caching)
- **Multi-session:** VectorStoreRetrieverMemory or Mem0

### 2. Implement Progressive Compression

Don't compress aggressively from the start. Use thresholds:
- 0-70%: Store verbatim
- 70-85%: Light compression (deduplication)
- 85-95%: Medium compression (summarization)
- 95-100%: Aggressive compression (hierarchical)

### 3. Combine Techniques

Single-technique approaches are suboptimal. Best production systems use:
- Rolling summarization (short-term)
- RAG retrieval (long-term)
- Prompt caching (cost optimization)
- Semantic deduplication (redundancy removal)

### 4. Monitor Quality Metrics

Track compression impact:
- Response relevance score
- Information retention rate
- User satisfaction metrics
- Token usage reduction

### 5. Use Prompt Caching Strategically

Cache stable content:
- Conversation summaries
- System prompts
- Knowledge base context
- User profiles

Don't cache frequently changing content:
- Current user query
- Real-time data
- Session-specific state

### 6. Implement Checkpointing

Save compression state for:
- Recovery from failures
- Multi-session continuity
- Analytics and debugging
- A/B testing different strategies

### 7. Tune Compression Parameters

Test and optimize:
- Summary token limits
- Compression thresholds
- Retrieval result counts
- Cache TTLs
- Chunk sizes for hierarchical

### 8. Handle Edge Cases

Plan for:
- Very long messages (split or compress individually)
- Code blocks (preserve formatting)
- Multi-language content
- Rapidly changing context

---

## Troubleshooting

### Problem: Summary loses critical information

**Solutions:**
- Lower compression ratio (less aggressive)
- Implement importance scoring to preserve key messages
- Use extractive summarization for critical sections
- Increase summary token budget

### Problem: Retrieval returns irrelevant context

**Solutions:**
- Improve embedding model quality
- Add metadata filtering (timestamps, topics)
- Adjust similarity threshold
- Use hybrid search (semantic + keyword)

### Problem: High latency from compression

**Solutions:**
- Compress asynchronously (background tasks)
- Use faster models for summarization (Haiku instead of Sonnet)
- Cache summaries more aggressively
- Reduce compression frequency

### Problem: Conversations still exceeding context window

**Solutions:**
- Implement hierarchical compression
- Archive to vector database more aggressively
- Use more aggressive compression ratios
- Consider switching to model with larger context window

### Problem: High costs despite compression

**Solutions:**
- Implement prompt caching
- Use cheaper models for summarization (Haiku)
- Batch summarization operations
- Reduce summarization frequency

### Problem: Lost conversation continuity

**Solutions:**
- Increase recent message window
- Include summary in every request
- Use more descriptive summaries
- Implement session resumption with context injection

---

## Advanced Topics

### Streaming Compression

**Compress in real-time as conversation progresses:**

```python
async def streaming_compress(messages: list):
    """Compress while streaming responses."""
    compressor = ProgressiveCompressor()

    async for message in conversation_stream:
        compressor.add_message(message)

        # Compression happens asynchronously
        if compressor.should_compress():
            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()
```

### Multi-User Session Management

**Handle concurrent conversations with shared context:**

```python
class MultiUserMemory:
    def __init__(self):
        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = HybridMemorySystem(...)
        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):
        """Remove sessions inactive for > timeout seconds."""
        current_time = time.time()
        inactive = [
            user_id for user_id, session in self.user_sessions.items()
            if current_time - session.last_activity > timeout
        ]

        for user_id in inactive:
            self._archive_session(user_id)
            del self.user_sessions[user_id]
```

### Custom Importance Scoring

**Train ML models to score message importance:**

```python
from transformers import pipeline

class MLImportanceScorer:
    def __init__(self):
        # Use pre-trained classifier or fine-tune on your data
        self.classifier = pipeline(
            "text-classification",
            model="your-importance-model"
        )

    def score(self, message: dict) -> float:
        """Score message importance (0-1)."""
        result = self.classifier(message['content'])
        return result[0]['score']
```

### Context Window Utilization Optimization

**Maximize information density within token budget:**

```python
def optimize_context_allocation(
    summary_tokens: int,
    recent_tokens: int,
    retrieval_tokens: int,
    max_tokens: int
):
    """
    Optimal allocation (empirically tested):
    - 20% summary
    - 50% recent messages
    - 30% retrieved context
    """
    return {
        'summary': int(max_tokens * 0.20),
        'recent': int(max_tokens * 0.50),
        'retrieval': int(max_tokens * 0.30)
    }
```

---

## Future Directions

### Emerging Techniques (2025+)

**1. Infinite Attention Mechanisms**
- Models with >10M token context windows (Gemini 1.5, future Claude)
- Reduces need for compression but doesn't eliminate cost concerns

**2. Learned Compression Models**
- Neural networks trained to compress conversation optimally
- Maintain semantic meaning while minimizing tokens
- Examples: LLMLingua v2, PromptCompressor

**3. Multimodal Session Compression**
- Compress conversations with images, audio, video
- Maintain cross-modal context relationships

**4. Federated Memory Systems**
- Distributed compression across multiple memory stores
- Privacy-preserving compression for sensitive conversations

**5. Adaptive Compression Strategies**
- RL-based systems that learn optimal compression per user/domain
- Dynamic threshold adjustment based on conversation importance

---

## References

### Academic Papers
- "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
- "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
- "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)

### Documentation
- [Anthropic Prompt Caching](https://docs.anthropic.com/claude/docs/prompt-caching)
- [LangChain Memory](https://python.langchain.com/docs/modules/memory/)
- [OpenAI Function Calling](https://platform.openai.com/docs/guides/function-calling)

### Tools
- [Mem0](https://mem0.ai/) - Managed memory service
- [Zep](https://www.getzep.com/) - Fast memory layer
- [LLMLingua](https://github.com/microsoft/LLMLingua) - Prompt compression
- [ChromaDB](https://www.trychroma.com/) - Vector database

---

**Last Updated:** 2025-11-30
**Version:** 1.0.0
**License:** MIT

Install

Requires askill CLI v1.0+

Metadata

LicenseUnknown
Version-
Updated1w ago
Publisherbobmatnyc

Tags

apici-cddatabasegithubllmobservabilitypromptingtesting