TokenPak Usage Examples¶
Copy-paste ready examples for common TokenPak patterns.
Example 1: Hello World — Basic Proxy Setup¶
Problem: You want to route your Anthropic API calls through TokenPak to get automatic compression and cost tracking.
Solution: Start the proxy, point your client at localhost:8766 instead of api.anthropic.com.
Setup¶
# Install TokenPak
pip install tokenpak
# Start the proxy (default port 8766, hybrid compression mode)
tokenpak serve
# Or with custom settings
TOKENPAK_PORT=8766 TOKENPAK_MODE=hybrid tokenpak serve
Drop-in Replacement¶
import anthropic
# Before (direct to Anthropic)
client = anthropic.Anthropic(api_key="sk-ant-...")
# After (through TokenPak proxy) — ONE LINE CHANGE
client = anthropic.Anthropic(
api_key="sk-ant-...",
base_url="http://localhost:8766",
)
# Your existing code works unchanged
message = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum entanglement."}]
)
print(message.content[0].text)
Expected Output¶
# Proxy startup:
TokenPak Forward Proxy v4
Listening: http://0.0.0.0:8766
Mode: hybrid (Protected/Code strict, Narrative compressed)
Vault: 2,943 blocks
# Per-request log:
[req] claude-opus-4-5 | 1,240 in → 892 sent (28% saved) | 156 out | $0.0089
Example 2: Custom Compression Mode¶
Problem: You have a codebase heavy with system prompts and code blocks. Default hybrid mode doesn't compress code. You want maximum savings.
Solution: Switch to aggressive mode to compress everything (except PROTECTED content).
# Aggressive: compress narrative + code (keep system prompts intact)
TOKENPAK_MODE=aggressive tokenpak serve
# Or per-request override via header
import requests
response = requests.post(
"http://localhost:8766/v1/messages",
headers={
"x-api-key": "sk-ant-...",
"x-tokenpak-mode": "aggressive", # Override mode for this request
"Content-Type": "application/json",
},
json={
"model": "claude-sonnet-4-5",
"max_tokens": 512,
"messages": [
{
"role": "user",
"content": (
"Here's a 500-line Python file:\n"
+ open("myproject/main.py").read()
+ "\n\nWhat's the main entry point?"
)
}
],
}
)
print(response.json()["content"][0]["text"])
Mode Comparison¶
| Mode | Narrative | Code | Config | Protected |
|---|---|---|---|---|
strict |
❌ No | ❌ No | ❌ No | ❌ No |
hybrid |
✅ Yes | ❌ No | ❌ No | ❌ No |
aggressive |
✅ Yes | ✅ Yes | ✅ Yes | ❌ No |
Protected content is NEVER compressed — system prompts, SOUL.md, tool schemas are always sent verbatim.
Example 3: Vault Context Injection¶
Problem: You have project documentation you want automatically injected into relevant requests without manually including it every time.
Solution: Index your vault and let TokenPak inject relevant context automatically.
# Index your project docs
tokenpak index ~/my-project/docs
# Or point to a custom path
VAULT_INDEX_PATH=~/my-project/.tokenpak tokenpak index ~/my-project/docs
# Rebuild on change
tokenpak index ~/my-project/docs --watch
# TokenPak will automatically inject relevant vault chunks
# based on semantic similarity to your request
client = anthropic.Anthropic(
api_key="sk-ant-...",
base_url="http://localhost:8766",
)
# This request will automatically get relevant docs injected
# from your vault without you doing anything
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": "How do I configure the cache timeout in our system?"
}]
)
# Response will have context from your docs injected automatically
# Check what was injected in the last request
curl http://localhost:8766/recent | python3 -m json.tool | grep -A5 "vault_injection"
# {
# "vault_injection": {
# "chunks_injected": 3,
# "tokens_injected": 412,
# "top_chunks": ["docs/config.md#cache-timeout", ...]
# }
# }
Example 4: Error Handling + Retry Logic¶
Problem: You need robust error handling when the upstream API is rate-limited or unavailable.
Solution: Use the circuit breaker information from /health to detect provider issues, and implement exponential backoff.
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_tokenpak_session(
proxy_url: str = "http://localhost:8766",
max_retries: int = 3,
backoff_factor: float = 0.5,
) -> requests.Session:
"""Create a requests Session that routes through TokenPak with retry logic."""
session = requests.Session()
retry = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 529], # 529 = Anthropic overloaded
allowed_methods=["POST"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
return session
def check_proxy_health(proxy_url: str = "http://localhost:8766") -> dict:
"""Check if proxy and upstream providers are healthy."""
try:
r = requests.get(f"{proxy_url}/health", timeout=2)
data = r.json()
cb = data.get("circuit_breakers", {})
if cb.get("any_open"):
open_providers = [
name for name, status in cb.get("providers", {}).items()
if status.get("state") == "open"
]
print(f"⚠️ Circuit breaker OPEN for: {open_providers}")
return data
except requests.RequestException:
return {"status": "unreachable"}
def chat_with_retry(
prompt: str,
model: str = "claude-sonnet-4-5",
proxy_url: str = "http://localhost:8766",
api_key: str = None,
) -> str:
"""Send a chat message with full retry + health check logic."""
# Check health before sending
health = check_proxy_health(proxy_url)
if health.get("status") not in ("ok", "degraded"):
raise RuntimeError(f"Proxy unhealthy: {health}")
session = create_tokenpak_session(proxy_url)
for attempt in range(3):
try:
r = session.post(
f"{proxy_url}/v1/messages",
headers={
"x-api-key": api_key or "sk-ant-...",
"Content-Type": "application/json",
},
json={
"model": model,
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
},
timeout=30,
)
r.raise_for_status()
return r.json()["content"][0]["text"]
except requests.HTTPError as e:
if e.response.status_code == 529:
wait = 2 ** attempt
print(f"Rate limited, waiting {wait}s (attempt {attempt + 1}/3)")
time.sleep(wait)
else:
raise
raise RuntimeError("Max retries exceeded")
# Usage
if __name__ == "__main__":
response = chat_with_retry(
"What is the capital of France?",
api_key="sk-ant-your-key-here",
)
print(response)
Example 5: Real-Time Streaming via WebSocket¶
Problem: You want real-time streaming responses for a chat interface without HTTP polling overhead.
Solution: Connect to the /ws WebSocket endpoint (runs on PROXY_PORT+1 by default).
import asyncio
import gzip
import json
import websockets
async def stream_chat(
prompt: str,
model: str = "claude-sonnet-4-5",
ws_url: str = "ws://localhost:8767/ws",
api_key: str = None,
):
"""Stream a chat response via WebSocket with gzip compression."""
async with websockets.connect(ws_url) as ws:
# Send request
await ws.send(json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"stream": True,
"api_key": api_key or "sk-ant-...",
}))
full_text = ""
print("Assistant: ", end="", flush=True)
async for message in ws:
# Messages are gzip-compressed binary frames
if isinstance(message, bytes):
decompressed = gzip.decompress(message).decode("utf-8")
event = json.loads(decompressed)
else:
event = json.loads(message)
event_type = event.get("type")
if event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
text = delta.get("text", "")
print(text, end="", flush=True)
full_text += text
elif event_type == "message_stop":
print() # newline after streaming
break
elif event_type == "stats":
usage = event.get("usage", {})
print(f"\n[tokens: in={usage.get('input_tokens',0)}, "
f"out={usage.get('output_tokens',0)}]")
elif "error" in event:
print(f"\n[error: {event['error']['message']}]")
break
return full_text
async def main():
# Multiple concurrent streams
tasks = [
stream_chat("Tell me a joke"),
stream_chat("What is 2+2?"),
]
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
asyncio.run(main())
Verify WebSocket Is Running¶
# Check proxy health (includes WS status)
curl http://localhost:8766/health | python3 -m json.tool
# Test WebSocket with websocat (CLI tool)
# pip install websocat (or brew install websocat)
echo '{"model":"claude-haiku-4-5","messages":[{"role":"user","content":"hi"}],"max_tokens":50,"api_key":"sk-ant-..."}' \
| websocat ws://localhost:8767/ws
Quick Reference¶
| Task | Command / Code |
|---|---|
| Start proxy | tokenpak serve |
| Check health | curl http://localhost:8766/health |
| View stats | curl http://localhost:8766/stats |
| See last request | curl http://localhost:8766/recent |
| Rebuild vault index | bash ~/vault/06_RUNTIME/scripts/rebuild-vault-index.sh |
| Hybrid mode | TOKENPAK_MODE=hybrid tokenpak serve |
| Aggressive mode | TOKENPAK_MODE=aggressive tokenpak serve |
| WebSocket port | TOKENPAK_WS_PORT=8767 tokenpak serve (default: PROXY_PORT+1) |
| Disable compression | TOKENPAK_MODE=strict tokenpak serve |
Environment Variables¶
TOKENPAK_PORT=8766 # HTTP proxy port (default: 8766)
TOKENPAK_MODE=hybrid # Compression mode: strict|hybrid|aggressive
TOKENPAK_WS_PORT=8767 # WebSocket port (default: PROXY_PORT+1)
TOKENPAK_REQUEST_TIMEOUT=30 # Per-request upstream timeout (seconds, 0=disabled)
VAULT_INDEX_PATH=~/.tokenpak # Path to vault index directory
TOKENPAK_INJECT_BUDGET=2200 # Max tokens injected per request
TOKENPAK_INJECT_MIN_SCORE=0.6 # Minimum similarity score for vault injection
See Also¶
- API Reference — Full endpoint documentation
- Production SLA — Performance targets
- Compression Benchmark — Compression analysis and optimization guide