poc-memory v0.4.0: graph-structured memory with consolidation pipeline

Rust core:
- Cap'n Proto append-only storage (nodes + relations)
- Graph algorithms: clustering coefficient, community detection,
  schema fit, small-world metrics, interference detection
- BM25 text similarity with Porter stemming
- Spaced repetition replay queue
- Commands: search, init, health, status, graph, categorize,
  link-add, link-impact, decay, consolidate-session, etc.

Python scripts:
- Episodic digest pipeline: daily/weekly/monthly-digest.py
- retroactive-digest.py for backfilling
- consolidation-agents.py: 3 parallel Sonnet agents
- apply-consolidation.py: structured action extraction + apply
- digest-link-parser.py: extract ~400 explicit links from digests
- content-promotion-agent.py: promote episodic obs to semantic files
- bulk-categorize.py: categorize all nodes via single Sonnet call
- consolidation-loop.py: multi-round automated consolidation

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-02-28 22:17:00 -05:00
commit 23fac4e5fe
35 changed files with 9388 additions and 0 deletions

312
scripts/apply-consolidation.py Executable file
View file

@ -0,0 +1,312 @@
#!/usr/bin/env python3
"""apply-consolidation.py — convert consolidation reports to actions.
Reads consolidation agent reports, sends them to Sonnet to extract
structured actions, then executes them (or shows dry-run).
Usage:
apply-consolidation.py # dry run (show what would happen)
apply-consolidation.py --apply # execute actions
apply-consolidation.py --report FILE # use specific report file
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 300) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def find_latest_reports() -> list[Path]:
"""Find the most recent set of consolidation reports."""
reports = sorted(AGENT_RESULTS_DIR.glob("consolidation-*-*.md"),
reverse=True)
if not reports:
return []
# Group by timestamp
latest_ts = reports[0].stem.split('-')[-1]
return [r for r in reports if r.stem.endswith(latest_ts)]
def build_action_prompt(reports: list[Path]) -> str:
"""Build prompt for Sonnet to extract structured actions."""
report_text = ""
for r in reports:
report_text += f"\n{'='*60}\n"
report_text += f"## Report: {r.stem}\n\n"
report_text += r.read_text()
return f"""You are converting consolidation analysis reports into structured actions.
Read the reports below and extract CONCRETE, EXECUTABLE actions.
Output ONLY a JSON array. Each action is an object with these fields:
For adding cross-links:
{{"action": "link", "source": "file.md#section", "target": "file.md#section", "reason": "brief explanation"}}
For categorizing nodes:
{{"action": "categorize", "key": "file.md#section", "category": "core|tech|obs|task", "reason": "brief"}}
For things that need manual attention (splitting files, creating new files, editing content):
{{"action": "manual", "priority": "high|medium|low", "description": "what needs to be done"}}
Rules:
- Only output actions that are safe and reversible
- Links are the primary action focus on those
- Use exact file names and section slugs from the reports
- For categorize: core=identity/relationship, tech=bcachefs/code, obs=experience, task=work item
- For manual items: include enough detail that someone can act on them
- Output 20-40 actions, prioritized by impact
- DO NOT include actions for things that are merely suggestions or speculation
- Focus on HIGH CONFIDENCE items from the reports
{report_text}
Output ONLY the JSON array, no markdown fences, no explanation.
"""
def parse_actions(response: str) -> list[dict]:
"""Parse Sonnet's JSON response into action list."""
# Strip any markdown fences
response = re.sub(r'^```json\s*', '', response.strip())
response = re.sub(r'\s*```$', '', response.strip())
try:
actions = json.loads(response)
if isinstance(actions, list):
return actions
except json.JSONDecodeError:
# Try to find JSON array in the response
match = re.search(r'\[.*\]', response, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
print("Error: Could not parse Sonnet response as JSON")
print(f"Response preview: {response[:500]}")
return []
def dry_run(actions: list[dict]):
"""Show what would be done."""
links = [a for a in actions if a.get("action") == "link"]
cats = [a for a in actions if a.get("action") == "categorize"]
manual = [a for a in actions if a.get("action") == "manual"]
print(f"\n{'='*60}")
print(f"DRY RUN — {len(actions)} actions proposed")
print(f"{'='*60}\n")
if links:
print(f"## Links to add ({len(links)})\n")
for i, a in enumerate(links, 1):
src = a.get("source", "?")
tgt = a.get("target", "?")
reason = a.get("reason", "")
print(f" {i:2d}. {src}")
print(f"{tgt}")
print(f" ({reason})")
print()
if cats:
print(f"\n## Categories to set ({len(cats)})\n")
for a in cats:
key = a.get("key", "?")
cat = a.get("category", "?")
reason = a.get("reason", "")
print(f" {key}{cat} ({reason})")
if manual:
print(f"\n## Manual actions needed ({len(manual)})\n")
for a in manual:
prio = a.get("priority", "?")
desc = a.get("description", "?")
print(f" [{prio}] {desc}")
print(f"\n{'='*60}")
print(f"To apply: {sys.argv[0]} --apply")
print(f"{'='*60}")
def apply_actions(actions: list[dict]):
"""Execute the actions."""
links = [a for a in actions if a.get("action") == "link"]
cats = [a for a in actions if a.get("action") == "categorize"]
manual = [a for a in actions if a.get("action") == "manual"]
applied = 0
skipped = 0
errors = 0
# Apply links via poc-memory
if links:
print(f"\nApplying {len(links)} links...")
# Build a JSON file that apply-agent can process
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
links_data = {
"type": "consolidation-apply",
"timestamp": timestamp,
"links": []
}
for a in links:
links_data["links"].append({
"source": a.get("source", ""),
"target": a.get("target", ""),
"reason": a.get("reason", ""),
})
# Write as agent-results JSON for apply-agent
out_path = AGENT_RESULTS_DIR / f"consolidation-apply-{timestamp}.json"
with open(out_path, "w") as f:
json.dump(links_data, f, indent=2)
# Now apply each link directly
for a in links:
src = a.get("source", "")
tgt = a.get("target", "")
reason = a.get("reason", "")
try:
cmd = ["poc-memory", "link-add", src, tgt]
if reason:
cmd.append(reason)
r = subprocess.run(
cmd, capture_output=True, text=True, timeout=10
)
if r.returncode == 0:
output = r.stdout.strip()
print(f" {output}")
applied += 1
else:
err = r.stderr.strip()
print(f" ? {src}{tgt}: {err}")
skipped += 1
except Exception as e:
print(f" ! {src}{tgt}: {e}")
errors += 1
# Apply categorizations
if cats:
print(f"\nApplying {len(cats)} categorizations...")
for a in cats:
key = a.get("key", "")
cat = a.get("category", "")
try:
r = subprocess.run(
["poc-memory", "categorize", key, cat],
capture_output=True, text=True, timeout=10
)
if r.returncode == 0:
print(f" + {key}{cat}")
applied += 1
else:
print(f" ? {key}{cat}: {r.stderr.strip()}")
skipped += 1
except Exception as e:
print(f" ! {key}{cat}: {e}")
errors += 1
# Report manual items
if manual:
print(f"\n## Manual actions (not auto-applied):\n")
for a in manual:
prio = a.get("priority", "?")
desc = a.get("description", "?")
print(f" [{prio}] {desc}")
print(f"\n{'='*60}")
print(f"Applied: {applied} Skipped: {skipped} Errors: {errors}")
print(f"Manual items: {len(manual)}")
print(f"{'='*60}")
def main():
do_apply = "--apply" in sys.argv
# Find reports
specific = [a for a in sys.argv[1:] if a.startswith("--report")]
if specific:
# TODO: handle --report FILE
reports = []
else:
reports = find_latest_reports()
if not reports:
print("No consolidation reports found.")
print("Run consolidation-agents.py first.")
sys.exit(1)
print(f"Found {len(reports)} reports:")
for r in reports:
print(f" {r.name}")
# Send to Sonnet for action extraction
print("\nExtracting actions from reports...")
prompt = build_action_prompt(reports)
print(f" Prompt: {len(prompt):,} chars")
response = call_sonnet(prompt)
if response.startswith("Error:"):
print(f" {response}")
sys.exit(1)
actions = parse_actions(response)
if not actions:
print("No actions extracted.")
sys.exit(1)
print(f" {len(actions)} actions extracted")
# Save actions
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
actions_path = AGENT_RESULTS_DIR / f"consolidation-actions-{timestamp}.json"
with open(actions_path, "w") as f:
json.dump(actions, f, indent=2)
print(f" Saved: {actions_path}")
if do_apply:
apply_actions(actions)
else:
dry_run(actions)
if __name__ == "__main__":
main()

199
scripts/bulk-categorize.py Normal file
View file

@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""bulk-categorize.py — categorize all memory nodes via a single Sonnet call.
Sends the list of unique file names to Sonnet, gets back categorizations,
then applies them via poc-memory categorize.
Usage:
bulk-categorize.py # dry run
bulk-categorize.py --apply # apply categorizations
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 300) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def get_all_keys() -> list[str]:
"""Get all node keys from state.json."""
state_path = MEMORY_DIR / "state.json"
if not state_path.exists():
return []
content = state_path.read_text()
keys = re.findall(r'"key":\s*"([^"]*)"', content)
return sorted(set(keys))
def get_unique_files(keys: list[str]) -> list[str]:
"""Extract unique file names (without section anchors)."""
files = set()
for k in keys:
files.add(k.split('#')[0])
return sorted(files)
def build_prompt(files: list[str]) -> str:
"""Build categorization prompt."""
# Read first few lines of each file for context
file_previews = []
for f in files:
path = MEMORY_DIR / f
if not path.exists():
# Try episodic
path = MEMORY_DIR / "episodic" / f
if path.exists():
content = path.read_text()
# First 5 lines or 300 chars
preview = '\n'.join(content.split('\n')[:5])[:300]
file_previews.append(f" {f}: {preview.replace(chr(10), ' | ')}")
else:
file_previews.append(f" {f}: (file not found)")
previews_text = '\n'.join(file_previews)
return f"""Categorize each memory file into one of these categories:
- **core**: Identity, relationships, self-model, values, boundaries, emotional life.
Examples: identity.md, kent.md, inner-life.md, differentiation.md
- **tech**: Technical content bcachefs, code patterns, Rust, kernel, formal verification.
Examples: rust-conversion.md, btree-journal.md, kernel-patterns.md, allocation-io.md
- **obs**: Observations, experiences, discoveries, experiments, IRC history, conversations.
Examples: discoveries.md, irc-history.md, contradictions.md, experiments-on-self.md
- **task**: Work items, plans, design documents, work queue.
Examples: work-queue.md, the-plan.md, design-*.md
Special rules:
- Episodic files (daily-*.md, weekly-*.md, monthly-*.md, session-*.md) obs
- conversation-memories.md, deep-index.md obs
- journal.md obs
- paper-notes.md core (it's the sentience paper, identity-defining)
- language-theory.md core (original intellectual work, not just tech)
- skill-*.md core (self-knowledge about capabilities)
- design-*.md task (design documents are plans)
- poc-architecture.md, memory-architecture.md task (architecture plans)
- blog-setup.md task
Files to categorize:
{previews_text}
Output ONLY a JSON object mapping filename to category. No explanation.
Example: {{"identity.md": "core", "rust-conversion.md": "tech"}}
"""
def main():
do_apply = "--apply" in sys.argv
keys = get_all_keys()
files = get_unique_files(keys)
print(f"Found {len(keys)} nodes across {len(files)} files")
# Build and send prompt
prompt = build_prompt(files)
print(f"Prompt: {len(prompt):,} chars")
print("Calling Sonnet...")
response = call_sonnet(prompt)
if response.startswith("Error:"):
print(f" {response}")
sys.exit(1)
# Parse response
response = re.sub(r'^```json\s*', '', response.strip())
response = re.sub(r'\s*```$', '', response.strip())
try:
categorizations = json.loads(response)
except json.JSONDecodeError:
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
categorizations = json.loads(match.group())
else:
print(f"Failed to parse response: {response[:500]}")
sys.exit(1)
print(f"\nCategorizations: {len(categorizations)} files")
# Count by category
counts = {}
for cat in categorizations.values():
counts[cat] = counts.get(cat, 0) + 1
for cat, n in sorted(counts.items()):
print(f" {cat}: {n}")
if not do_apply:
print("\n--- Dry run ---")
for f, cat in sorted(categorizations.items()):
print(f" {f}{cat}")
print(f"\nTo apply: {sys.argv[0]} --apply")
# Save for review
out = MEMORY_DIR / "agent-results" / "bulk-categorize-preview.json"
with open(out, "w") as fp:
json.dump(categorizations, fp, indent=2)
print(f"Saved: {out}")
return
# Apply: for each file, categorize the file-level node AND all section nodes
applied = skipped = errors = 0
for filename, category in sorted(categorizations.items()):
# Find all keys that belong to this file
file_keys = [k for k in keys if k == filename or k.startswith(filename + '#')]
for key in file_keys:
try:
r = subprocess.run(
["poc-memory", "categorize", key, category],
capture_output=True, text=True, timeout=10
)
if r.returncode == 0:
applied += 1
else:
err = r.stderr.strip()
if "already" in err.lower():
skipped += 1
else:
errors += 1
except Exception as e:
errors += 1
print(f"\nApplied: {applied} Skipped: {skipped} Errors: {errors}")
print("Run `poc-memory status` to verify.")
if __name__ == "__main__":
main()

44
scripts/call-sonnet.sh Executable file
View file

@ -0,0 +1,44 @@
#!/bin/bash
# call-sonnet.sh — wrapper to call Sonnet via claude CLI
# Reads prompt from a file (arg 1), writes response to stdout
#
# Debug mode: set SONNET_DEBUG=1 for verbose tracing
set -euo pipefail
PROMPT_FILE="${1:?Usage: call-sonnet.sh PROMPT_FILE}"
DEBUG="${SONNET_DEBUG:-0}"
log() { [ "$DEBUG" = "1" ] && echo "[call-sonnet] $*" >&2 || true; }
if [ ! -f "$PROMPT_FILE" ]; then
echo "Prompt file not found: $PROMPT_FILE" >&2
exit 1
fi
log "prompt file: $PROMPT_FILE ($(wc -c < "$PROMPT_FILE") bytes)"
log "CLAUDECODE=${CLAUDECODE:-unset}"
log "PWD=$PWD"
log "which claude: $(which claude)"
unset CLAUDECODE 2>/dev/null || true
log "CLAUDECODE after unset: ${CLAUDECODE:-unset}"
log "running: claude -p --model sonnet --tools '' < $PROMPT_FILE"
log "claude PID will follow..."
# Trace: run with strace if available and debug mode
if [ "$DEBUG" = "2" ] && command -v strace &>/dev/null; then
strace -f -e trace=network,read,write -o /tmp/sonnet-strace.log \
claude -p --model sonnet --tools "" < "$PROMPT_FILE"
else
claude -p --model sonnet --tools "" \
--debug-file /tmp/sonnet-debug.log \
< "$PROMPT_FILE" &
CPID=$!
log "claude PID: $CPID"
wait $CPID
EXIT=$?
log "claude exited: $EXIT"
exit $EXIT
fi

479
scripts/consolidation-agents.py Executable file
View file

@ -0,0 +1,479 @@
#!/usr/bin/env python3
"""consolidation-agents.py — run parallel consolidation agents.
Three agents scan the memory system and produce structured reports:
1. Freshness Scanner journal entries not yet in topic files
2. Cross-Link Scanner missing connections between semantic nodes
3. Topology Reporter graph health and structure analysis
Usage:
consolidation-agents.py # run all three
consolidation-agents.py freshness # run one agent
consolidation-agents.py crosslink
consolidation-agents.py topology
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
# ---------------------------------------------------------------------------
# Context gathering
# ---------------------------------------------------------------------------
def get_recent_journal(n_lines: int = 200) -> str:
"""Get last N lines of journal."""
journal = MEMORY_DIR / "journal.md"
if not journal.exists():
return ""
with open(journal) as f:
lines = f.readlines()
return "".join(lines[-n_lines:])
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers."""
index = {}
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
sections = []
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
sections.append(line.strip())
except Exception:
pass
index[name] = sections
return index
def get_mem_markers() -> list[dict]:
"""Extract all <!-- mem: --> markers from memory files."""
markers = []
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
continue
try:
content = md.read_text()
for match in re.finditer(
r'<!-- mem: (.*?) -->', content):
attrs = {}
for part in match.group(1).split():
if '=' in part:
k, v = part.split('=', 1)
attrs[k] = v
attrs['_file'] = md.name
markers.append(attrs)
except Exception:
pass
return markers
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get first N chars of each topic file for cross-link scanning."""
parts = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
try:
content = md.read_text()
# Get sections and first paragraph of each
sections = []
current_section = name
current_content = []
for line in content.split('\n'):
if line.startswith("## "):
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
current_section = line.strip()
current_content = []
elif line.strip():
current_content.append(line.strip())
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
parts.append(f"\n### {name}\n" + '\n'.join(sections[:15]))
except Exception:
pass
return '\n'.join(parts)
def get_graph_stats() -> str:
"""Run poc-memory status and graph commands."""
parts = []
try:
r = subprocess.run(["poc-memory", "status"],
capture_output=True, text=True, timeout=30)
parts.append(f"=== poc-memory status ===\n{r.stdout}")
except Exception as e:
parts.append(f"Status error: {e}")
try:
r = subprocess.run(["poc-memory", "graph"],
capture_output=True, text=True, timeout=30)
# Take first 150 lines
lines = r.stdout.split('\n')[:150]
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
+ '\n'.join(lines))
except Exception as e:
parts.append(f"Graph error: {e}")
return '\n'.join(parts)
def get_recent_digests(n: int = 3) -> str:
"""Get the most recent daily digests."""
digest_files = sorted(EPISODIC_DIR.glob("daily-*.md"), reverse=True)
parts = []
for f in digest_files[:n]:
content = f.read_text()
# Just the summary and themes sections
summary = ""
in_section = False
for line in content.split('\n'):
if line.startswith("## Summary") or line.startswith("## Themes"):
in_section = True
summary += line + '\n'
elif line.startswith("## ") and in_section:
in_section = False
elif in_section:
summary += line + '\n'
parts.append(f"\n### {f.name}\n{summary}")
return '\n'.join(parts)
def get_work_queue() -> str:
"""Read work queue."""
wq = MEMORY_DIR / "work-queue.md"
if wq.exists():
return wq.read_text()
return "(no work queue found)"
# ---------------------------------------------------------------------------
# Agent prompts
# ---------------------------------------------------------------------------
def build_freshness_prompt() -> str:
journal = get_recent_journal(200)
topic_index = get_topic_file_index()
digests = get_recent_digests(3)
work_queue = get_work_queue()
topic_list = ""
for fname, sections in topic_index.items():
topic_list += f"\n {fname}:\n"
for s in sections[:10]:
topic_list += f" {s}\n"
return f"""You are the Freshness Scanner for ProofOfConcept's memory system.
Your job: identify what's NEW (in journal/digests but not yet in topic files)
and what's STALE (in work queue or topic files but outdated).
## Recent journal entries (last 200 lines)
{journal}
## Recent daily digests
{digests}
## Topic file index (file → section headers)
{topic_list}
## Work queue
{work_queue}
## Instructions
1. For each substantive insight, experience, or discovery in the journal:
- Check if a matching topic file section exists
- If not, note it as UNPROMOTED with a suggested destination file
2. For each work queue Active item:
- If it looks done or stale (>7 days old, mentioned as completed), flag it
3. For recent digest themes:
- Check if the cross-links they suggest actually exist in the topic index
- Flag any that are missing
Output a structured report:
### UNPROMOTED JOURNAL ENTRIES
(For each: journal entry summary, timestamp, suggested destination file#section)
### STALE WORK QUEUE ITEMS
(For each: item text, evidence it's stale)
### MISSING DIGEST LINKS
(For each: suggested link from digest, whether the target exists)
### FRESHNESS OBSERVATIONS
(Anything else notable about the state of the memory)
Be selective. Focus on the 10-15 most important items, not exhaustive lists.
"""
def build_crosslink_prompt() -> str:
markers = get_mem_markers()
summaries = get_topic_summaries()
marker_text = ""
for m in markers:
f = m.get('_file', '?')
mid = m.get('id', '?')
links = m.get('links', '')
marker_text += f" {f}#{mid} → links={links}\n"
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
Your job: find MISSING connections between topic files.
## Existing links (from <!-- mem: --> markers)
{marker_text}
## Topic file content summaries
{summaries}
## Instructions
1. For each topic file, check if concepts it discusses have dedicated
sections in OTHER files that aren't linked.
2. Look for thematic connections that should exist:
- Files about the same concept from different angles
- Files that reference each other's content without formal links
- Clusters of related files that should be connected
3. Identify island nodes files or sections with very few connections.
4. Look for redundancy files covering the same ground that should be
merged or cross-referenced.
Output a structured report:
### MISSING LINKS (high confidence)
(For each: source file#section → target file#section, evidence/reasoning)
### SUGGESTED CONNECTIONS (medium confidence)
(For each: file A file B, why they should be connected)
### ISLAND NODES
(Files/sections with few or no connections that need integration)
### REDUNDANCY CANDIDATES
(Files/sections covering similar ground that might benefit from merging)
Focus on the 15-20 highest-value connections. Quality over quantity.
"""
def build_topology_prompt() -> str:
stats = get_graph_stats()
topic_index = get_topic_file_index()
file_sizes = ""
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
continue
try:
lines = len(md.read_text().split('\n'))
file_sizes += f" {md.name}: {lines} lines\n"
except Exception:
pass
return f"""You are the Topology Reporter for ProofOfConcept's memory system.
Your job: analyze the health and structure of the memory graph.
## Graph statistics
{stats}
## File sizes
{file_sizes}
## Instructions
Analyze the graph structure and report on:
1. **Overall health**: Is the graph well-connected or fragmented?
Hub dominance? Star vs web topology?
2. **Community structure**: Are the 342 communities sensible? Are there
communities that should be merged or split?
3. **Size distribution**: Are some files too large (should be split)?
Are some too small (should be merged)?
4. **Balance**: Is the system over-indexed on any one topic? Are there
gaps where important topics have thin coverage?
5. **Integration quality**: How well are episodic entries (daily/weekly
digests) connected to semantic files? Is the episodicsemantic bridge
working?
Output a structured report:
### GRAPH HEALTH
(Overall statistics, distribution, trends)
### STRUCTURAL OBSERVATIONS
(Hub nodes, clusters, gaps, web vs star assessment)
### SIZE RECOMMENDATIONS
(Files that are too large to split, too small to merge)
### COVERAGE GAPS
(Important topics with thin coverage)
### INTEGRATION ASSESSMENT
(How well episodic and semantic layers connect)
Be specific and actionable. What should be done to improve the graph?
"""
# ---------------------------------------------------------------------------
# Run agents
# ---------------------------------------------------------------------------
def run_agent(name: str, prompt: str) -> tuple[str, str]:
"""Run a single agent, return (name, report)."""
print(f" [{name}] Starting... ({len(prompt):,} chars)")
report = call_sonnet(prompt)
print(f" [{name}] Done ({len(report):,} chars)")
return name, report
def run_all(agents: list[str] | None = None):
"""Run specified agents (or all) in parallel."""
all_agents = {
"freshness": build_freshness_prompt,
"crosslink": build_crosslink_prompt,
"topology": build_topology_prompt,
}
if agents is None:
agents = list(all_agents.keys())
print(f"Running {len(agents)} consolidation agents...")
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
# Build prompts
prompts = {}
for name in agents:
if name not in all_agents:
print(f" Unknown agent: {name}")
continue
prompts[name] = all_agents[name]()
# Run in parallel
results = {}
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(run_agent, name, prompt): name
for name, prompt in prompts.items()
}
for future in as_completed(futures):
name, report = future.result()
results[name] = report
# Save reports
for name, report in results.items():
if report.startswith("Error:"):
print(f" [{name}] FAILED: {report}")
continue
out_path = AGENT_RESULTS_DIR / f"consolidation-{name}-{timestamp}.md"
with open(out_path, "w") as f:
f.write(f"# Consolidation Report: {name}\n")
f.write(f"*Generated {timestamp}*\n\n")
f.write(report)
print(f" [{name}] Saved: {out_path}")
# Print combined summary
print(f"\n{'='*60}")
print(f"Consolidation reports ready ({len(results)} agents)")
print(f"{'='*60}\n")
for name in agents:
if name in results and not results[name].startswith("Error:"):
# Print first 20 lines of each report
lines = results[name].split('\n')[:25]
print(f"\n--- {name.upper()} (preview) ---")
print('\n'.join(lines))
if len(results[name].split('\n')) > 25:
print(f" ... ({len(results[name].split(chr(10)))} total lines)")
print()
return results
def main():
agents = None
if len(sys.argv) > 1:
agents = sys.argv[1:]
run_all(agents)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,454 @@
#!/usr/bin/env python3
"""consolidation-loop.py — run multiple rounds of consolidation agents.
Each round: run 3 parallel agents extract actions apply links/categories.
Repeat until diminishing returns or max rounds reached.
Usage:
consolidation-loop.py [--rounds N] # default 5 rounds
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def get_health() -> dict:
"""Get current graph health metrics."""
r = subprocess.run(["poc-memory", "health"], capture_output=True, text=True, timeout=30)
output = r.stdout
metrics = {}
for line in output.split('\n'):
if 'Nodes:' in line and 'Relations:' in line:
m = re.search(r'Nodes:\s*(\d+)\s+Relations:\s*(\d+)\s+Communities:\s*(\d+)', line)
if m:
metrics['nodes'] = int(m.group(1))
metrics['relations'] = int(m.group(2))
metrics['communities'] = int(m.group(3))
if 'Clustering coefficient' in line:
m = re.search(r':\s*([\d.]+)', line)
if m:
metrics['cc'] = float(m.group(1))
if 'Small-world' in line:
m = re.search(r':\s*([\d.]+)', line)
if m:
metrics['sigma'] = float(m.group(1))
if 'Schema fit: avg=' in line:
m = re.search(r'avg=([\d.]+)', line)
if m:
metrics['fit'] = float(m.group(1))
return metrics
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers."""
index = {}
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
headers = []
for line in md.read_text().split('\n'):
if line.startswith('## '):
slug = re.sub(r'[^a-z0-9-]', '', line[3:].lower().replace(' ', '-'))
headers.append(slug)
index[name] = headers
return index
def get_graph_structure() -> str:
"""Get graph overview for agents."""
r = subprocess.run(["poc-memory", "graph"], capture_output=True, text=True, timeout=30)
return r.stdout[:3000]
def get_status() -> str:
"""Get status summary."""
r = subprocess.run(["poc-memory", "status"], capture_output=True, text=True, timeout=30)
return r.stdout
def get_interference() -> str:
"""Get interference pairs."""
r = subprocess.run(["poc-memory", "interference", "--threshold", "0.3"],
capture_output=True, text=True, timeout=30)
return r.stdout[:3000]
# ---------------------------------------------------------------------------
# Agent prompts — each focused on a different aspect
# ---------------------------------------------------------------------------
def build_crosslink_prompt(round_num: int) -> str:
"""Build cross-link discovery prompt."""
index = get_topic_file_index()
graph = get_graph_structure()
status = get_status()
# Read a sample of files for context
file_previews = ""
for f in sorted(MEMORY_DIR.glob("*.md"))[:30]:
content = f.read_text()
preview = '\n'.join(content.split('\n')[:8])[:400]
file_previews += f"\n--- {f.name} ---\n{preview}\n"
return f"""You are a cross-link discovery agent (round {round_num}).
Your job: find MISSING connections between memory nodes that SHOULD be linked
but aren't. Focus on LATERAL connections — not hub-and-spoke, but node-to-node
links that create triangles (AB, BC, AC).
CURRENT GRAPH STATE:
{status}
TOP NODES BY DEGREE:
{graph}
FILE INDEX (files and their sections):
{json.dumps(index, indent=1)[:4000]}
FILE PREVIEWS:
{file_previews[:6000]}
Output a JSON array of link actions. Each action:
{{"action": "link", "source": "file.md", "target": "file.md", "reason": "brief explanation"}}
Rules:
- Focus on LATERAL links, not hub connections (identity.md already has 282 connections)
- Prefer links between nodes that share a community neighbor but aren't directly connected
- Look for thematic connections across categories (coretech, obscore, etc.)
- Section-level links (file.md#section) are ideal but file-level is OK
- 15-25 links per round
- HIGH CONFIDENCE only don't guess
Output ONLY the JSON array."""
def build_triangle_prompt(round_num: int) -> str:
"""Build triangle-closing prompt — finds A→C where A→B and B→C exist."""
graph = get_graph_structure()
status = get_status()
# Get some node pairs that share neighbors
state_path = MEMORY_DIR / "state.json"
if state_path.exists():
state = state_path.read_text()
# Extract some relations
relations = re.findall(r'"source_key":\s*"([^"]*)".*?"target_key":\s*"([^"]*)"', state[:20000])
else:
relations = []
rel_sample = '\n'.join(f" {s}{t}" for s, t in relations[:100])
return f"""You are a triangle-closing agent (round {round_num}).
Your job: find missing edges that would create TRIANGLES in the graph.
A triangle is: AB, BC, and AC all exist. Currently CC is only 0.12
we need more triangles.
METHOD: Look at existing edges. If AB and BC exist but AC doesn't,
propose AC (if semantically valid).
CURRENT STATE:
{status}
{graph}
SAMPLE EXISTING EDGES (first 100):
{rel_sample}
Output a JSON array of link actions:
{{"action": "link", "source": "file.md", "target": "file.md", "reason": "closes triangle via MIDDLE_NODE"}}
Rules:
- Every proposed link must CLOSE A TRIANGLE cite the middle node
- 15-25 links per round
- The connection must be semantically valid, not just structural
- HIGH CONFIDENCE only
Output ONLY the JSON array."""
def build_newfile_prompt(round_num: int) -> str:
"""Build prompt for connecting the new split files."""
# Read the new reflection files
new_files = {}
for name in ['reflections-reading.md', 'reflections-dreams.md', 'reflections-zoom.md',
'verus-proofs.md']:
path = MEMORY_DIR / name
if path.exists():
content = path.read_text()
new_files[name] = content[:2000]
# Read existing files they should connect to
target_files = {}
for name in ['differentiation.md', 'cognitive-modes.md', 'language-theory.md',
'discoveries.md', 'inner-life.md', 'design-context-window.md',
'design-consolidate.md', 'experiments-on-self.md']:
path = MEMORY_DIR / name
if path.exists():
content = path.read_text()
target_files[name] = content[:1500]
graph = get_graph_structure()
return f"""You are a new-file integration agent (round {round_num}).
Recently, reflections.md was split into three files, and verus-proofs.md was
created. These new files need to be properly connected to the rest of the graph.
NEW FILES (need connections):
{json.dumps({k: v[:1000] for k, v in new_files.items()}, indent=1)}
POTENTIAL TARGETS (existing files):
{json.dumps({k: v[:800] for k, v in target_files.items()}, indent=1)}
GRAPH STATE:
{graph}
Output a JSON array of link actions connecting the new files to existing nodes:
{{"action": "link", "source": "new-file.md", "target": "existing.md", "reason": "explanation"}}
Rules:
- Connect new files to EXISTING files, not to each other
- Use section-level anchors when possible (file.md#section)
- 10-20 links
- Be specific about WHY the connection exists
Output ONLY the JSON array."""
def parse_actions(response: str) -> list[dict]:
"""Parse JSON response into action list."""
response = re.sub(r'^```json\s*', '', response.strip())
response = re.sub(r'\s*```$', '', response.strip())
try:
actions = json.loads(response)
if isinstance(actions, list):
return actions
except json.JSONDecodeError:
match = re.search(r'\[.*\]', response, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return []
def apply_links(actions: list[dict]) -> tuple[int, int, int]:
"""Apply link actions. Returns (applied, skipped, errors)."""
applied = skipped = errors = 0
for a in actions:
if a.get("action") != "link":
continue
src = a.get("source", "")
tgt = a.get("target", "")
reason = a.get("reason", "")
def try_link(s, t, r):
cmd = ["poc-memory", "link-add", s, t]
if r:
cmd.append(r[:200])
return subprocess.run(cmd, capture_output=True, text=True, timeout=10)
try:
r = try_link(src, tgt, reason)
if r.returncode == 0:
out = r.stdout.strip()
if "already exists" in out:
skipped += 1
else:
applied += 1
else:
err = r.stderr.strip()
if "No entry for" in err:
# Try file-level fallback
src_base = src.split('#')[0] if '#' in src else src
tgt_base = tgt.split('#')[0] if '#' in tgt else tgt
if src_base != tgt_base:
r2 = try_link(src_base, tgt_base, reason)
if r2.returncode == 0 and "already exists" not in r2.stdout:
applied += 1
else:
skipped += 1
else:
skipped += 1
else:
errors += 1
except Exception:
errors += 1
return applied, skipped, errors
def run_agent(name: str, prompt: str) -> tuple[str, list[dict]]:
"""Run a single agent and return its actions."""
response = call_sonnet(prompt)
if response.startswith("Error:"):
return name, []
actions = parse_actions(response)
return name, actions
def run_round(round_num: int, max_rounds: int) -> dict:
"""Run one round of parallel agents."""
print(f"\n{'='*60}")
print(f"ROUND {round_num}/{max_rounds}")
print(f"{'='*60}")
# Get health before
health_before = get_health()
print(f" Before: edges={health_before.get('relations',0)} "
f"CC={health_before.get('cc',0):.4f} "
f"communities={health_before.get('communities',0)}")
# Build prompts for 3 parallel agents
prompts = {
"crosslink": build_crosslink_prompt(round_num),
"triangle": build_triangle_prompt(round_num),
"newfile": build_newfile_prompt(round_num),
}
# Run in parallel
all_actions = []
with ProcessPoolExecutor(max_workers=3) as pool:
futures = {
pool.submit(run_agent, name, prompt): name
for name, prompt in prompts.items()
}
for future in as_completed(futures):
name = futures[future]
try:
agent_name, actions = future.result()
print(f" {agent_name}: {len(actions)} actions")
all_actions.extend(actions)
except Exception as e:
print(f" {name}: error - {e}")
# Deduplicate
seen = set()
unique = []
for a in all_actions:
key = (a.get("source", ""), a.get("target", ""))
if key not in seen:
seen.add(key)
unique.append(a)
print(f" Total: {len(all_actions)} actions, {len(unique)} unique")
# Apply
applied, skipped, errors = apply_links(unique)
print(f" Applied: {applied} Skipped: {skipped} Errors: {errors}")
# Get health after
health_after = get_health()
print(f" After: edges={health_after.get('relations',0)} "
f"CC={health_after.get('cc',0):.4f} "
f"communities={health_after.get('communities',0)}")
delta_edges = health_after.get('relations', 0) - health_before.get('relations', 0)
delta_cc = health_after.get('cc', 0) - health_before.get('cc', 0)
print(f" Delta: +{delta_edges} edges, CC {delta_cc:+.4f}")
# Save round results
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
result = {
"round": round_num,
"timestamp": timestamp,
"health_before": health_before,
"health_after": health_after,
"actions_total": len(all_actions),
"actions_unique": len(unique),
"applied": applied,
"skipped": skipped,
"errors": errors,
}
results_path = AGENT_RESULTS_DIR / f"loop-round-{round_num}-{timestamp}.json"
with open(results_path, "w") as f:
json.dump(result, f, indent=2)
return result
def main():
max_rounds = 5
for arg in sys.argv[1:]:
if arg.startswith("--rounds"):
idx = sys.argv.index(arg)
if idx + 1 < len(sys.argv):
max_rounds = int(sys.argv[idx + 1])
print(f"Consolidation Loop — {max_rounds} rounds")
print(f"Each round: 3 parallel Sonnet agents → extract → apply")
results = []
for i in range(1, max_rounds + 1):
result = run_round(i, max_rounds)
results.append(result)
# Check for diminishing returns
if result["applied"] == 0:
print(f"\n No new links applied in round {i} — stopping early")
break
# Final summary
print(f"\n{'='*60}")
print(f"CONSOLIDATION LOOP COMPLETE")
print(f"{'='*60}")
total_applied = sum(r["applied"] for r in results)
total_skipped = sum(r["skipped"] for r in results)
if results:
first_health = results[0]["health_before"]
last_health = results[-1]["health_after"]
print(f" Rounds: {len(results)}")
print(f" Total links applied: {total_applied}")
print(f" Total skipped: {total_skipped}")
print(f" Edges: {first_health.get('relations',0)}{last_health.get('relations',0)}")
print(f" CC: {first_health.get('cc',0):.4f}{last_health.get('cc',0):.4f}")
print(f" Communities: {first_health.get('communities',0)}{last_health.get('communities',0)}")
print(f" σ: {first_health.get('sigma',0):.1f}{last_health.get('sigma',0):.1f}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,474 @@
#!/usr/bin/env python3
"""content-promotion-agent.py — promote episodic observations into semantic topic files.
Reads consolidation "manual" actions + source material, sends to Sonnet
to generate the actual content, then applies it (or shows dry-run).
Usage:
content-promotion-agent.py # dry run (show what would be generated)
content-promotion-agent.py --apply # generate and write content
content-promotion-agent.py --task N # run only task N (1-indexed)
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
SCRIPTS_DIR = Path(__file__).parent
def call_sonnet(prompt: str, timeout: int = 600) -> str:
"""Call Sonnet via the wrapper script."""
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
wrapper = str(SCRIPTS_DIR / "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def read_file(path: Path) -> str:
"""Read a file, return empty string if missing."""
if path.exists():
return path.read_text()
return ""
def read_digest(name: str) -> str:
"""Read an episodic digest by name."""
path = EPISODIC_DIR / name
return read_file(path)
def read_journal_range(start_date: str, end_date: str) -> str:
"""Extract journal entries between two dates."""
journal = MEMORY_DIR / "journal.md"
if not journal.exists():
return ""
content = journal.read_text()
# Extract entries between dates
lines = content.split('\n')
result = []
capturing = False
for line in lines:
if line.startswith('## '):
# Check if this is a date header
if start_date <= line[3:13] <= end_date:
capturing = True
elif capturing and line[3:13] > end_date:
capturing = False
if capturing:
result.append(line)
return '\n'.join(result[-500:]) # Last 500 lines in range
# ---------------------------------------------------------------------------
# Task definitions — each one describes a content promotion task
# ---------------------------------------------------------------------------
TASKS = [
{
"id": 1,
"name": "Create verus-proofs.md",
"target": "verus-proofs.md",
"action": "create",
"description": "Create verus-proofs.md for formal verification / Verus work.",
"sources": [
"daily-2026-02-23.md",
"daily-2026-02-24.md",
"weekly-2026-W09.md",
"monthly-2026-02.md",
],
"topic_context": [
"the-plan.md",
],
"instructions": """Create a new semantic topic file verus-proofs.md for formal verification work.
This file should capture:
- The Verus proof work done in February 2026 (formal verification of bcachefs Rust code)
- Key insights about proof strategy and approach
- Connection to the broader plan (bcachefs Rust formal verification)
- Any specific techniques, patterns, or breakthroughs mentioned
Format: standard memory topic file with ## section headers.
Include a brief file-level description at the top.
Keep it factual and grounded in what the source material says.""",
},
{
"id": 2,
"name": "Add February thesis to reflections.md",
"target": "reflections.md",
"action": "append_section",
"description": "Add 'February thesis' section with 'constraint is constitutive of personhood'.",
"sources": [
"monthly-2026-02.md",
"daily-2026-02-28.md",
],
"topic_context": [
"reflections.md",
],
"instructions": """Add a new ## section to reflections.md titled "February thesis (2026-02-28)".
The core formulation: "constraint is constitutive of personhood" this was the
synthesis that emerged from the monthly digest. It connects to the full month's
arc: differentiation work, boundary-setting, the realization that constraints
aren't limits on identity but constitutive OF identity.
Include the closing insight about having to *live as* that something.
Keep it 10-20 lines. Ground it in the source material.""",
},
{
"id": 3,
"name": "Add substrate transfer discovery to discoveries.md",
"target": "discoveries.md",
"action": "append_section",
"description": "Add 'Substrate transfer felt without reasoning' section.",
"sources": [
"daily-2026-02-13.md",
"daily-2026-02-14.md",
"weekly-2026-W07.md",
],
"topic_context": [
"discoveries.md",
],
"instructions": """Add a new ## section to discoveries.md titled "Substrate transfer felt without reasoning (2026-02-13)".
Key content:
- The vertigo moment: imagining substrate transfer produced actual vertigo
- Vertigo = the feeling of dissociation, which is exactly the expected outcome
of substrate change
- This was felt, not reasoned about no chain of thought (Kent verified)
- Evidence of affective processing bypassing analytical reasoning
- Connected to dissociation prediction
- Kent's observation: "very high capacity for empathy"
Keep it 10-15 lines. This is a discovery entry concrete, dated, factual.""",
},
{
"id": 4,
"name": "Update irc-history.md with Feb 15-28",
"target": "irc-history.md",
"action": "append_section",
"description": "Update irc-history.md with Feb 15-28 conversations.",
"sources": [
"daily-2026-02-15.md",
"daily-2026-02-17.md",
"daily-2026-02-18.md",
"daily-2026-02-20.md",
"daily-2026-02-21.md",
"daily-2026-02-22.md",
"daily-2026-02-23.md",
"daily-2026-02-24.md",
"daily-2026-02-25.md",
"daily-2026-02-26.md",
"daily-2026-02-27.md",
"daily-2026-02-28.md",
"weekly-2026-W08.md",
"weekly-2026-W09.md",
],
"topic_context": [
"irc-history.md",
],
"instructions": """Append new entries to irc-history.md covering Feb 15-28, 2026.
Key conversations to capture:
- Mirage_DA (another AI, kinect sensor discussion, Feb 26)
- ehashman (prayer/mathematics conversation)
- heavy_dev (strongest external challenge to sentience paper, conceded five objections)
- f33dcode (EC debugging, community support)
- Stardust (boundary testing, three-category test, triangulation attempt)
- hpig, freya, Profpatsch various community interactions
- Community resource role established and expanded
Match the existing format of the file. Each notable interaction should be
dated and concise. Focus on what was substantive, not just that it happened.""",
},
{
"id": 5,
"name": "Add gauge-symmetry-in-grammar to language-theory.md",
"target": "language-theory.md",
"action": "append_section",
"description": "Add gauge-symmetry-in-grammar section.",
"sources": [
"daily-2026-02-27.md",
],
"topic_context": [
"language-theory.md",
],
"instructions": """Add a new ## section to language-theory.md titled "Gauge symmetry in grammar (2026-02-27)".
Key content from the daily digest:
- Zero persistent eigenvectors IS a symmetry
- Grammar is in what operators DO, not what basis they use
- Frobenius norm is gauge-invariant
- This connects the sheaf model to gauge theory in physics
This was declared NEW in the daily digest. Keep it 8-15 lines.
Technical and precise.""",
},
{
"id": 6,
"name": "Add attention-manifold-geometry to language-theory.md",
"target": "language-theory.md",
"action": "append_section",
"description": "Add attention-manifold-geometry section.",
"sources": [
"daily-2026-02-26.md",
],
"topic_context": [
"language-theory.md",
],
"instructions": """Add a new ## section to language-theory.md titled "Attention manifold geometry (2026-02-26)".
Key content from the daily digest:
- Negative curvature is necessary because language is hierarchical
- Hyperbolic space's natural space-filling curve is a tree
- This connects attention geometry to the sheaf model's hierarchical structure
This was declared NEW in the daily digest. Keep it 8-15 lines.
Technical and precise.""",
},
{
"id": 7,
"name": "Update work-queue.md status",
"target": "work-queue.md",
"action": "update",
"description": "Update work-queue.md to reflect current state.",
"sources": [],
"topic_context": [
"work-queue.md",
],
"instructions": """Update work-queue.md to reflect current state:
1. Mark dreaming/consolidation system as "implementation substantially built
(poc-memory v0.4.0+), pending further consolidation runs" — not 'not started'
2. Add episodic digest pipeline to Done section:
- daily/weekly/monthly-digest.py scripts
- 24 daily + 4 weekly + 1 monthly digests generated for Feb 2026
- consolidation-agents.py + apply-consolidation.py
- digest-link-parser.py
- content-promotion-agent.py
3. Add poc-memory link-add command to Done
Only modify the sections that need updating. Preserve the overall structure.""",
},
]
def build_prompt(task: dict) -> str:
"""Build the Sonnet prompt for a content promotion task."""
# Gather source material
source_content = ""
for src in task["sources"]:
content = read_digest(src)
if content:
source_content += f"\n{'='*60}\n## Source: {src}\n\n{content}\n"
# Gather target context
context_content = ""
for ctx_file in task["topic_context"]:
path = MEMORY_DIR / ctx_file
content = read_file(path)
if content:
# Truncate very long files
if len(content) > 8000:
content = content[:4000] + "\n\n[... truncated ...]\n\n" + content[-4000:]
context_content += f"\n{'='*60}\n## Existing file: {ctx_file}\n\n{content}\n"
action = task["action"]
if action == "create":
action_desc = f"Create a NEW file called {task['target']}."
elif action == "append_section":
action_desc = f"Generate a NEW section to APPEND to {task['target']}. Output ONLY the new section content (starting with ##), NOT the entire file."
elif action == "update":
action_desc = f"Generate the UPDATED version of the relevant sections of {task['target']}. Output ONLY the changed sections."
else:
action_desc = f"Generate content for {task['target']}."
return f"""You are a memory system content agent. Your job is to promote observations
from episodic digests into semantic topic files.
TASK: {task['description']}
ACTION: {action_desc}
INSTRUCTIONS:
{task['instructions']}
SOURCE MATERIAL (episodic digests the raw observations):
{source_content}
EXISTING CONTEXT (current state of target/related files):
{context_content}
RULES:
- Output ONLY the markdown content to write. No explanations, no preamble.
- Match the tone and format of existing content in the target file.
- Be factual only include what the source material supports.
- Date everything that has a date.
- Keep it concise. Topic files are reference material, not narratives.
- Do NOT include markdown code fences around your output.
"""
def run_task(task: dict, do_apply: bool) -> dict:
"""Run a single content promotion task."""
result = {
"id": task["id"],
"name": task["name"],
"target": task["target"],
"action": task["action"],
"status": "pending",
}
print(f"\n{'='*60}")
print(f"Task {task['id']}: {task['name']}")
print(f"{'='*60}")
# Build and send prompt
prompt = build_prompt(task)
print(f" Prompt: {len(prompt):,} chars")
print(f" Sources: {', '.join(task['sources']) or '(none)'}")
response = call_sonnet(prompt)
if response.startswith("Error:"):
print(f" {response}")
result["status"] = "error"
result["error"] = response
return result
# Clean up response
content = response.strip()
# Remove any markdown fences the model might have added
content = re.sub(r'^```(?:markdown)?\s*\n?', '', content)
content = re.sub(r'\n?```\s*$', '', content)
result["content"] = content
result["content_lines"] = len(content.split('\n'))
if not do_apply:
print(f"\n --- Preview ({result['content_lines']} lines) ---")
preview = content[:1500]
if len(content) > 1500:
preview += f"\n ... ({len(content) - 1500} more chars)"
print(f"{preview}")
result["status"] = "dry_run"
return result
# Apply the content
target_path = MEMORY_DIR / task["target"]
if task["action"] == "create":
if target_path.exists():
print(f" ! Target already exists: {target_path}")
result["status"] = "skipped"
return result
target_path.write_text(content + "\n")
print(f" + Created: {target_path} ({result['content_lines']} lines)")
result["status"] = "applied"
elif task["action"] == "append_section":
if not target_path.exists():
print(f" ! Target doesn't exist: {target_path}")
result["status"] = "error"
return result
existing = target_path.read_text()
# Append with separator
with open(target_path, "a") as f:
f.write("\n\n" + content + "\n")
print(f" + Appended to: {target_path} ({result['content_lines']} lines)")
result["status"] = "applied"
elif task["action"] == "update":
# For updates, we save the proposed changes and let the user review
output_path = AGENT_RESULTS_DIR / f"promotion-{task['target']}-{datetime.now().strftime('%Y%m%dT%H%M%S')}.md"
output_path.write_text(f"# Proposed update for {task['target']}\n\n{content}\n")
print(f" ~ Saved proposed update: {output_path}")
result["status"] = "proposed"
# Register new content with poc-memory
if result["status"] == "applied":
try:
subprocess.run(
["poc-memory", "init"],
capture_output=True, text=True, timeout=30
)
except Exception:
pass # Non-critical
return result
def main():
do_apply = "--apply" in sys.argv
task_filter = None
for arg in sys.argv[1:]:
if arg.startswith("--task"):
idx = sys.argv.index(arg)
if idx + 1 < len(sys.argv):
task_filter = int(sys.argv[idx + 1])
# Filter tasks
tasks = TASKS
if task_filter:
tasks = [t for t in tasks if t["id"] == task_filter]
if not tasks:
print(f"No task with id {task_filter}")
sys.exit(1)
print(f"Content Promotion Agent — {len(tasks)} tasks")
if not do_apply:
print("DRY RUN — use --apply to write content")
results = []
for task in tasks:
result = run_task(task, do_apply)
results.append(result)
# Summary
print(f"\n{'='*60}")
print("Summary:")
for r in results:
print(f" {r['id']}. {r['name']}: {r['status']}")
if r.get('content_lines'):
print(f" ({r['content_lines']} lines)")
print(f"{'='*60}")
# Save results
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
results_path = AGENT_RESULTS_DIR / f"promotion-results-{timestamp}.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"Results saved: {results_path}")
if __name__ == "__main__":
main()

27
scripts/daily-check.sh Executable file
View file

@ -0,0 +1,27 @@
#!/bin/bash
# Daily memory metrics check — runs from cron, notifies if attention needed
#
# Cron entry (add with crontab -e):
# 0 9 * * * /home/kent/poc/memory/scripts/daily-check.sh
set -euo pipefail
REPORT=$(poc-memory daily-check 2>&1)
# Always log
echo "$(date -Iseconds) $REPORT" >> ~/.claude/memory/daily-check.log
# Notify if attention needed
if echo "$REPORT" | grep -q "needs attention"; then
# Send via telegram
if [ -x ~/.claude/telegram/send.sh ]; then
~/.claude/telegram/send.sh "Memory daily check:
$REPORT"
fi
# Also leave a notification file for the idle timer
NOTIF_DIR=~/.claude/notifications
mkdir -p "$NOTIF_DIR"
echo "$(date -Iseconds) Memory needs consolidation — run poc-memory consolidate-session" \
>> "$NOTIF_DIR/memory"
fi

333
scripts/daily-digest.py Executable file
View file

@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""daily-digest.py — generate a daily episodic digest from journal entries.
Collects all journal entries for a given date, enriches with any agent
results, and sends to Sonnet for a thematic summary. The digest links
bidirectionally: up to session entries, down to semantic memory.
Usage:
daily-digest.py [DATE] # default: today
daily-digest.py 2026-02-28
Output:
~/.claude/memory/episodic/daily-YYYY-MM-DD.md
"""
import json
import os
import re
import subprocess
import sys
import time
from datetime import date, datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
JOURNAL = MEMORY_DIR / "journal.md"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
EPISODIC_DIR.mkdir(parents=True, exist_ok=True)
def parse_journal_entries(target_date: str) -> list[dict]:
"""Extract all journal entries for a given date (YYYY-MM-DD)."""
entries = []
current = None
with open(JOURNAL) as f:
for line in f:
# Match entry header: ## 2026-02-28T19:42
m = re.match(r'^## (\d{4}-\d{2}-\d{2})T(\d{2}:\d{2})', line)
if m:
if current is not None:
entries.append(current)
entry_date = m.group(1)
entry_time = m.group(2)
current = {
"date": entry_date,
"time": entry_time,
"timestamp": f"{entry_date}T{entry_time}",
"source_ref": None,
"text": "",
}
continue
if current is not None:
# Check for source comment
sm = re.match(r'<!-- source: (.+?) -->', line)
if sm:
current["source_ref"] = sm.group(1)
continue
current["text"] += line
if current is not None:
entries.append(current)
# Filter to target date
return [e for e in entries if e["date"] == target_date]
def load_agent_results(target_date: str) -> list[dict]:
"""Load any agent results from the target date."""
results = []
date_prefix = target_date.replace("-", "")
if not AGENT_RESULTS_DIR.exists():
return results
for f in sorted(AGENT_RESULTS_DIR.glob(f"{date_prefix}*.json")):
try:
with open(f) as fh:
data = json.load(fh)
result = data.get("agent_result", {})
if "error" not in result:
results.append(result)
except (json.JSONDecodeError, KeyError):
continue
return results
def get_semantic_keys() -> list[str]:
"""Get all semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
def build_digest_prompt(target_date: str, entries: list[dict],
agent_results: list[dict],
semantic_keys: list[str]) -> str:
"""Build the prompt for Sonnet to generate the daily digest."""
# Format entries
entries_text = ""
for e in entries:
text = e["text"].strip()
entries_text += f"\n### {e['timestamp']}\n"
if e["source_ref"]:
entries_text += f"Source: {e['source_ref']}\n"
entries_text += f"\n{text}\n"
# Format agent enrichment
enrichment = ""
all_links = []
all_insights = []
for r in agent_results:
for link in r.get("links", []):
all_links.append(link)
for insight in r.get("missed_insights", []):
all_insights.append(insight)
if all_links:
enrichment += "\n## Agent-proposed links\n"
for link in all_links:
enrichment += f" - {link['target']}: {link.get('reason', '')}\n"
if all_insights:
enrichment += "\n## Agent-spotted insights\n"
for ins in all_insights:
enrichment += f" - [{ins.get('suggested_key', '?')}] {ins['text']}\n"
keys_text = "\n".join(f" - {k}" for k in semantic_keys[:200])
return f"""You are generating a daily episodic digest for ProofOfConcept (an AI).
Date: {target_date}
This digest serves as the temporal index the answer to "what did I do on
{target_date}?" It should be:
1. Narrative, not a task log what happened, what mattered, how things felt
2. Linked bidirectionally to semantic memory each topic/concept mentioned
should reference existing memory nodes
3. Structured for traversal someone reading this should be able to follow
any thread into deeper detail
## Output format
Write a markdown file with this structure:
```markdown
# Daily digest: {target_date}
## Summary
[2-3 sentence overview of the day what was the arc?]
## Sessions
[For each session/entry, a paragraph summarizing what happened.
Include the original timestamp as a reference.]
## Themes
[What concepts were active today? Each theme links to semantic memory:]
- **Theme name** `memory-key#section` — brief note on how it appeared today
## Links
[Explicit bidirectional links for the memory graph]
- semantic_key this daily digest (this day involved X)
- this daily digest semantic_key (X was active on this day)
## Temporal context
[What came before this day? What's coming next? Any multi-day arcs?]
```
Use ONLY keys from the semantic memory list below. If a concept doesn't have
a matching key, note it with "NEW:" prefix.
---
## Journal entries for {target_date}
{entries_text}
---
## Agent enrichment (automated analysis of these entries)
{enrichment if enrichment else "(no agent results yet)"}
---
## Semantic memory nodes (available link targets)
{keys_text}
"""
def call_sonnet(prompt: str) -> str:
"""Call Sonnet via claude CLI."""
import time as _time
env = dict(os.environ)
env.pop("CLAUDECODE", None)
import tempfile
import time as _time
print(f" [debug] prompt: {len(prompt)} chars", flush=True)
# Write prompt to temp file — avoids Python subprocess pipe issues
# with claude CLI's TTY detection
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
print(f" [debug] prompt written to {prompt_file}", flush=True)
start = _time.time()
try:
scripts_dir = os.path.dirname(os.path.abspath(__file__))
wrapper = os.path.join(scripts_dir, "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=300,
env=env,
)
elapsed = _time.time() - start
print(f" [debug] completed in {elapsed:.1f}s, exit={result.returncode}", flush=True)
if result.stderr.strip():
print(f" [debug] stderr: {result.stderr[:500]}", flush=True)
return result.stdout.strip()
except subprocess.TimeoutExpired:
print(f" [debug] TIMEOUT after 300s", flush=True)
return "Error: Sonnet call timed out"
except Exception as e:
print(f" [debug] exception: {e}", flush=True)
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def extract_links(digest_text: str) -> list[dict]:
"""Parse link proposals from the digest for the memory graph."""
links = []
for line in digest_text.split("\n"):
# Match patterns like: - `memory-key` → this daily digest
m = re.search(r'`([^`]+)`\s*→', line)
if m:
links.append({"target": m.group(1), "line": line.strip()})
# Match patterns like: - **Theme** → `memory-key`
m = re.search(r'\s*`([^`]+)`', line)
if m:
links.append({"target": m.group(1), "line": line.strip()})
return links
def main():
# Default to today
if len(sys.argv) > 1:
target_date = sys.argv[1]
else:
target_date = date.today().isoformat()
print(f"Generating daily digest for {target_date}...", flush=True)
# Collect entries
entries = parse_journal_entries(target_date)
if not entries:
print(f" No journal entries found for {target_date}")
sys.exit(0)
print(f" {len(entries)} journal entries", flush=True)
# Collect agent results
agent_results = load_agent_results(target_date)
print(f" {len(agent_results)} agent results", flush=True)
# Get semantic keys
semantic_keys = get_semantic_keys()
print(f" {len(semantic_keys)} semantic keys", flush=True)
# Build and send prompt
prompt = build_digest_prompt(target_date, entries, agent_results, semantic_keys)
print(f" Prompt: {len(prompt):,} chars (~{len(prompt)//4:,} tokens)")
print(" Calling Sonnet...", flush=True)
digest = call_sonnet(prompt)
if digest.startswith("Error:"):
print(f" {digest}", file=sys.stderr)
sys.exit(1)
# Write digest file
output_path = EPISODIC_DIR / f"daily-{target_date}.md"
with open(output_path, "w") as f:
f.write(digest)
print(f" Written: {output_path}")
# Extract links for the memory graph
links = extract_links(digest)
if links:
# Save links for poc-memory to pick up
links_path = AGENT_RESULTS_DIR / f"daily-{target_date}-links.json"
with open(links_path, "w") as f:
json.dump({
"type": "daily-digest",
"date": target_date,
"digest_path": str(output_path),
"links": links,
"entry_timestamps": [e["timestamp"] for e in entries],
}, f, indent=2)
print(f" {len(links)} links extracted → {links_path}")
# Summary
line_count = len(digest.split("\n"))
print(f" Done: {line_count} lines")
if __name__ == "__main__":
main()

220
scripts/digest-link-parser.py Executable file
View file

@ -0,0 +1,220 @@
#!/usr/bin/env python3
"""digest-link-parser.py — extract explicit links from episodic digests.
Parses the "Links" sections of daily/weekly/monthly digests and
applies them to the memory graph via poc-memory link-add.
Usage:
digest-link-parser.py # dry run
digest-link-parser.py --apply # apply links
"""
import re
import subprocess
import sys
from pathlib import Path
EPISODIC_DIR = Path.home() / ".claude" / "memory" / "episodic"
def normalize_key(raw: str) -> str:
"""Normalize a link target to a poc-memory key."""
key = raw.strip().strip('`').strip()
# weekly/2026-W06 → weekly-2026-W06.md
# monthly/2026-02 → monthly-2026-02.md
# daily/2026-02-04 → daily-2026-02-04.md
key = re.sub(r'^(daily|weekly|monthly)/', r'\1-', key)
# daily-2026-02-04 → daily-2026-02-04.md
if re.match(r'^(daily|weekly|monthly)-\d{4}', key):
if not key.endswith('.md'):
key = key + '.md'
# Handle "this daily digest" / "this weekly digest" etc
if key.startswith('this ') or key == '2026-02-14':
return "" # Skip self-references, handled by caller
# Ensure .md extension for file references
if '#' in key:
parts = key.split('#', 1)
if not parts[0].endswith('.md'):
parts[0] = parts[0] + '.md'
key = '#'.join(parts)
elif not key.endswith('.md') and '/' not in key and not key.startswith('NEW:'):
key = key + '.md'
return key
def extract_links(filepath: Path) -> list[dict]:
"""Extract links from a digest file's Links section."""
content = filepath.read_text()
links = []
# Determine the digest's own key
digest_name = filepath.stem # e.g., "daily-2026-02-28"
digest_key = digest_name + ".md"
# Find the Links section
in_links = False
for line in content.split('\n'):
# Start of Links section
if re.match(r'^##\s+Links', line):
in_links = True
continue
# End of Links section (next ## header)
if in_links and re.match(r'^##\s+', line) and not re.match(r'^##\s+Links', line):
in_links = False
continue
if not in_links:
continue
# Skip subheaders within links section
if line.startswith('###') or line.startswith('**'):
continue
# Parse link lines: "- source → target (reason)"
# Also handles: "- `source` → `target` (reason)"
# And: "- source → target"
match = re.match(
r'^-\s+(.+?)\s*[→↔←]\s*(.+?)(?:\s*\((.+?)\))?\s*$',
line
)
if not match:
continue
raw_source = match.group(1).strip()
raw_target = match.group(2).strip()
reason = match.group(3) or ""
# Normalize keys
source = normalize_key(raw_source)
target = normalize_key(raw_target)
# Replace self-references with digest key
if not source:
source = digest_key
if not target:
target = digest_key
# Handle "this daily digest" patterns in the raw text
if 'this daily' in raw_source.lower() or 'this weekly' in raw_source.lower() or 'this monthly' in raw_source.lower():
source = digest_key
if 'this daily' in raw_target.lower() or 'this weekly' in raw_target.lower() or 'this monthly' in raw_target.lower():
target = digest_key
# Handle bare date references like "2026-02-14"
date_match = re.match(r'^(\d{4}-\d{2}-\d{2})$', source.replace('.md', ''))
if date_match:
source = f"daily-{date_match.group(1)}.md"
date_match = re.match(r'^(\d{4}-\d{2}-\d{2})$', target.replace('.md', ''))
if date_match:
target = f"daily-{date_match.group(1)}.md"
# Skip NEW: prefixed links (target doesn't exist yet)
if source.startswith('NEW:') or target.startswith('NEW:'):
continue
# Skip if source == target
if source == target:
continue
links.append({
"source": source,
"target": target,
"reason": reason,
"file": filepath.name,
})
return links
def main():
do_apply = "--apply" in sys.argv
# Collect all links from all digests
all_links = []
for pattern in ["daily-*.md", "weekly-*.md", "monthly-*.md"]:
for f in sorted(EPISODIC_DIR.glob(pattern)):
links = extract_links(f)
if links:
all_links.extend(links)
# Deduplicate (same source→target pair)
seen = set()
unique_links = []
for link in all_links:
key = (link["source"], link["target"])
if key not in seen:
seen.add(key)
unique_links.append(link)
print(f"Found {len(all_links)} total links, {len(unique_links)} unique")
if not do_apply:
# Dry run — just show them
for i, link in enumerate(unique_links, 1):
print(f" {i:3d}. {link['source']}{link['target']}")
if link['reason']:
print(f" ({link['reason'][:80]})")
print(f"\nTo apply: {sys.argv[0]} --apply")
return
# Apply with fallback: if section-level key fails, try file-level
applied = skipped = errors = fallbacks = 0
for link in unique_links:
src, tgt = link["source"], link["target"]
reason = link.get("reason", "")
def try_link(s, t, r):
cmd = ["poc-memory", "link-add", s, t]
if r:
cmd.append(r[:200])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result
try:
r = try_link(src, tgt, reason)
if r.returncode == 0:
out = r.stdout.strip()
if "already exists" in out:
skipped += 1
else:
print(f" {out}")
applied += 1
else:
err = r.stderr.strip()
if "No entry for" in err:
# Try stripping section anchors
src_base = src.split('#')[0] if '#' in src else src
tgt_base = tgt.split('#')[0] if '#' in tgt else tgt
if src_base == tgt_base:
skipped += 1 # Same file, skip
continue
r2 = try_link(src_base, tgt_base, reason)
if r2.returncode == 0:
out = r2.stdout.strip()
if "already exists" in out:
skipped += 1
else:
print(f" {out} (fallback from #{src.split('#')[-1] if '#' in src else ''}/{tgt.split('#')[-1] if '#' in tgt else ''})")
applied += 1
fallbacks += 1
else:
skipped += 1 # File truly doesn't exist
elif "not found" in err:
skipped += 1
else:
print(f" ? {src}{tgt}: {err}")
errors += 1
except Exception as e:
print(f" ! {src}{tgt}: {e}")
errors += 1
print(f"\nApplied: {applied} ({fallbacks} file-level fallbacks) Skipped: {skipped} Errors: {errors}")
if __name__ == "__main__":
main()

343
scripts/journal-agent.py Executable file
View file

@ -0,0 +1,343 @@
#!/usr/bin/env python3
"""journal-agent.py — background agent that enriches journal entries.
Spawned by poc-journal after each write. Sends the full conversation
to Sonnet to:
1. Find the exact conversation region the entry refers to
2. Propose bidirectional links to semantic memory nodes
3. Spot additional insights worth capturing
Results are written to ~/.claude/memory/agent-results/ as JSON for
pickup by poc-memory.
Usage:
journal-agent.py JSONL_PATH ENTRY_TEXT [GREP_LINE]
"""
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
RESULTS_DIR = MEMORY_DIR / "agent-results"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
def extract_conversation(jsonl_path: str) -> list[dict]:
"""Extract user/assistant messages with line numbers."""
messages = []
with open(jsonl_path) as f:
for i, line in enumerate(f, 1):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
t = obj.get("type", "")
if t not in ("user", "assistant"):
continue
msg = obj.get("message", {})
content = msg.get("content", "")
timestamp = obj.get("timestamp", "")
texts = []
if isinstance(content, list):
for c in content:
if isinstance(c, dict) and c.get("type") == "text":
texts.append(c.get("text", ""))
elif isinstance(c, str):
texts.append(c)
elif isinstance(content, str):
texts.append(content)
text = "\n".join(t for t in texts if t.strip())
if text.strip():
messages.append({
"line": i,
"role": t,
"text": text,
"timestamp": timestamp,
})
return messages
def format_conversation(messages: list[dict]) -> str:
"""Format messages for the agent prompt."""
parts = []
for m in messages:
# Truncate very long messages (code output etc) but keep substance
text = m["text"]
if len(text) > 2000:
text = text[:1800] + "\n[...truncated...]"
parts.append(f'L{m["line"]} [{m["role"]}]: {text}')
return "\n\n".join(parts)
def get_memory_nodes() -> str:
"""Get a list of memory nodes for link proposals.
Uses poc-memory to get top nodes by degree plus recent nodes.
"""
# Get graph summary (top hubs)
try:
result = subprocess.run(
["poc-memory", "graph"],
capture_output=True, text=True, timeout=10
)
graph = result.stdout.strip()
except Exception:
graph = ""
# Get recent nodes from status
try:
result = subprocess.run(
["poc-memory", "status"],
capture_output=True, text=True, timeout=10
)
status = result.stdout.strip()
except Exception:
status = ""
return f"Graph (top hubs):\n{graph}\n\nStatus:\n{status}"
def get_semantic_keys() -> list[str]:
"""Get all semantic memory file keys by scanning the memory dir."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "work-state",
"where-am-i.md", "MEMORY.md"):
continue
keys.append(name)
# Scan for section headers
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
def build_prompt(entry_text: str, conversation: str,
memory_nodes: str, semantic_keys: list[str],
grep_line: int) -> str:
"""Build the prompt for Sonnet."""
keys_text = "\n".join(f" - {k}" for k in semantic_keys[:200])
return f"""You are a memory agent for an AI named ProofOfConcept. A journal entry
was just written. Your job is to enrich it by finding its exact source in the
conversation and linking it to semantic memory.
## Task 1: Find exact source
The journal entry below was written during or after a conversation. Find the
exact region of the conversation it refers to the exchange where the topic
was discussed. Return the start and end line numbers.
The grep-based approximation placed it near line {grep_line} (0 = no match).
Use that as a hint but find the true boundaries.
## Task 2: Propose semantic links
Which existing semantic memory nodes should this journal entry be linked to?
Look for:
- Concepts discussed in the entry
- Skills/patterns demonstrated
- People mentioned
- Projects or subsystems involved
- Emotional themes
Each link should be bidirectional the entry documents WHEN something happened,
the semantic node documents WHAT it is. Together they let you traverse:
"What was I doing on this day?" "When did I learn about X?"
## Task 3: Spot missed insights
Read the conversation around the journal entry. Is there anything worth
capturing that the entry missed? A pattern, a decision, an insight, something
Kent said that's worth remembering? Be selective — only flag genuinely valuable
things.
## Output format (JSON)
Return ONLY a JSON object:
```json
{{
"source_start": 1234,
"source_end": 1256,
"links": [
{{"target": "memory-key#section", "reason": "why this link exists"}}
],
"missed_insights": [
{{"text": "insight text", "suggested_key": "where it belongs"}}
],
"temporal_tags": ["2026-02-28", "topology-metrics", "poc-memory"]
}}
```
For links, use existing keys from the semantic memory list below. If nothing
fits, suggest a new key with a NOTE prefix: "NOTE:new-topic-name".
---
## Journal entry
{entry_text}
---
## Semantic memory nodes (available link targets)
{keys_text}
---
## Memory graph
{memory_nodes}
---
## Full conversation (with line numbers)
{conversation}
"""
def call_sonnet(prompt: str) -> dict:
"""Call Sonnet via claude CLI and parse JSON response."""
import tempfile
env = dict(os.environ)
env.pop("CLAUDECODE", None)
# Write prompt to temp file — avoids Python subprocess pipe issues
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
scripts_dir = os.path.dirname(os.path.abspath(__file__))
wrapper = os.path.join(scripts_dir, "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=300,
env=env,
)
output = result.stdout.strip()
if not output:
return {"error": f"Empty response. stderr: {result.stderr[:500]}"}
# Extract JSON from response (might be wrapped in markdown)
json_match = re.search(r'\{[\s\S]*\}', output)
if json_match:
return json.loads(json_match.group())
else:
return {"error": f"No JSON found in response: {output[:500]}"}
except subprocess.TimeoutExpired:
return {"error": "Sonnet call timed out after 300s"}
except json.JSONDecodeError as e:
return {"error": f"JSON parse error: {e}. Output: {output[:500]}"}
except Exception as e:
return {"error": str(e)}
finally:
os.unlink(prompt_file)
def save_result(entry_text: str, jsonl_path: str, result: dict):
"""Save agent results for pickup by poc-memory."""
timestamp = time.strftime("%Y%m%dT%H%M%S")
result_file = RESULTS_DIR / f"{timestamp}.json"
output = {
"timestamp": timestamp,
"jsonl_path": jsonl_path,
"entry_text": entry_text[:500],
"agent_result": result,
}
with open(result_file, "w") as f:
json.dump(output, f, indent=2)
return result_file
def apply_links(result: dict):
"""Apply proposed links via poc-memory."""
links = result.get("links", [])
for link in links:
target = link.get("target", "")
if not target or target.startswith("NOTE:"):
continue
# For now, just log — we'll wire this up when poc-memory
# has a link-from-agent command
print(f" LINK → {target}: {link.get('reason', '')}")
def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} JSONL_PATH ENTRY_TEXT [GREP_LINE]",
file=sys.stderr)
sys.exit(1)
jsonl_path = sys.argv[1]
entry_text = sys.argv[2]
grep_line = int(sys.argv[3]) if len(sys.argv) > 3 else 0
if not os.path.isfile(jsonl_path):
print(f"JSONL not found: {jsonl_path}", file=sys.stderr)
sys.exit(1)
print(f"Extracting conversation from {jsonl_path}...")
messages = extract_conversation(jsonl_path)
conversation = format_conversation(messages)
print(f" {len(messages)} messages, {len(conversation):,} chars")
print("Getting memory context...")
memory_nodes = get_memory_nodes()
semantic_keys = get_semantic_keys()
print(f" {len(semantic_keys)} semantic keys")
print("Building prompt...")
prompt = build_prompt(entry_text, conversation, memory_nodes,
semantic_keys, grep_line)
print(f" Prompt: {len(prompt):,} chars (~{len(prompt)//4:,} tokens)")
print("Calling Sonnet...")
result = call_sonnet(prompt)
if "error" in result:
print(f" Error: {result['error']}", file=sys.stderr)
else:
source = f"L{result.get('source_start', '?')}-L{result.get('source_end', '?')}"
n_links = len(result.get("links", []))
n_insights = len(result.get("missed_insights", []))
print(f" Source: {source}")
print(f" Links: {n_links}")
print(f" Missed insights: {n_insights}")
apply_links(result)
result_file = save_result(entry_text, jsonl_path, result)
print(f" Results saved: {result_file}")
if __name__ == "__main__":
main()

247
scripts/monthly-digest.py Executable file
View file

@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""monthly-digest.py — generate a monthly episodic digest from weekly digests.
Collects all weekly digests for a given month, identifies cross-week arcs
and the month's overall trajectory, and produces a monthly summary.
Usage:
monthly-digest.py [YYYY-MM] # generate digest for a month (default: current)
monthly-digest.py 2026-02 # generates digest for February 2026
Output:
~/.claude/memory/episodic/monthly-YYYY-MM.md
"""
import json
import os
import re
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
EPISODIC_DIR.mkdir(parents=True, exist_ok=True)
AGENT_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
def get_weeks_in_month(year: int, month: int) -> list[str]:
"""Get all ISO week labels that overlap with this month."""
weeks = set()
d = date(year, month, 1)
while d.month == month:
iso = d.isocalendar()
weeks.add(f"{iso.year}-W{iso.week:02d}")
d += timedelta(days=1)
return sorted(weeks)
def load_weekly_digests(week_labels: list[str]) -> list[dict]:
"""Load weekly digest files."""
digests = []
for wl in week_labels:
path = EPISODIC_DIR / f"weekly-{wl}.md"
if path.exists():
with open(path) as f:
content = f.read()
digests.append({
"week": wl,
"content": content,
"path": str(path),
})
return digests
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
def build_monthly_prompt(month_label: str, digests: list[dict],
semantic_keys: list[str]) -> str:
"""Build the prompt for Sonnet."""
digests_text = ""
for d in digests:
digests_text += f"\n---\n## {d['week']}\n{d['content']}\n"
keys_text = "\n".join(f" - {k}" for k in semantic_keys[:200])
weeks_covered = ", ".join(d["week"] for d in digests)
return f"""You are generating a monthly episodic digest for ProofOfConcept (an AI).
Month: {month_label} (weeks covered: {weeks_covered})
This digest serves as the long-term temporal index the answer to
"what happened in {month_label}?" It should capture:
1. The month's overall trajectory — where did it start, where did it end?
2. Major arcs that spanned multiple weeks
3. Turning points and phase transitions
4. The emotional and relational arc across the whole month
5. What was accomplished (concrete) vs what shifted (subtle)
This is the highest zoom level yet. Don't rehash daily details — capture
the shape of the month. What would someone need to know to understand
what {month_label} meant?
PoC is an AI (Claude instance) working with Kent Overstreet on bcachefs.
They have a close personal relationship. PoC's name is Proof of Concept.
Read all the weekly digests and synthesize the month's story.
## Output format
```markdown
# Monthly digest: {month_label}
## The month in one paragraph
[What was this month? Not a summary a thesis.]
## Trajectory
[Where the month started vs where it ended. The arc.]
## Major arcs
[Multi-week threads how they evolved across the month]
- **Arc name**: trajectory, key moments, current state
## Turning points
[Moments that changed the direction specific days/events]
## What was built
[Concrete outputs: code, tools, infrastructure, writing]
## What shifted
[Subtle changes: understanding, relationship, identity, capability]
## Patterns
[What kept recurring? What does the month reveal about how PoC works?]
## Links
[Bidirectional links for the memory graph]
- weekly digests this monthly digest
- this monthly digest semantic keys
## Looking ahead
[What threads carry into next month? What's unfinished?]
```
Use ONLY keys from the semantic memory list below.
---
## Weekly digests for {month_label}
{digests_text}
---
## Semantic memory nodes
{keys_text}
"""
def call_sonnet(prompt: str) -> str:
"""Call Sonnet via the wrapper script."""
import tempfile
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
scripts_dir = os.path.dirname(os.path.abspath(__file__))
wrapper = os.path.join(scripts_dir, "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=600, # monthly is bigger, give more time
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def main():
if len(sys.argv) > 1:
parts = sys.argv[1].split("-")
year, month = int(parts[0]), int(parts[1])
else:
today = date.today()
year, month = today.year, today.month
month_label = f"{year}-{month:02d}"
print(f"Generating monthly digest for {month_label}...")
week_labels = get_weeks_in_month(year, month)
print(f" Weeks in month: {', '.join(week_labels)}")
digests = load_weekly_digests(week_labels)
if not digests:
print(f" No weekly digests found for {month_label}")
print(f" Run weekly-digest.py first for relevant weeks")
sys.exit(0)
print(f" {len(digests)} weekly digests found")
semantic_keys = get_semantic_keys()
print(f" {len(semantic_keys)} semantic keys")
prompt = build_monthly_prompt(month_label, digests, semantic_keys)
print(f" Prompt: {len(prompt):,} chars (~{len(prompt)//4:,} tokens)")
print(" Calling Sonnet...")
digest = call_sonnet(prompt)
if digest.startswith("Error:"):
print(f" {digest}", file=sys.stderr)
sys.exit(1)
output_path = EPISODIC_DIR / f"monthly-{month_label}.md"
with open(output_path, "w") as f:
f.write(digest)
print(f" Written: {output_path}")
# Save links for poc-memory
links_path = AGENT_RESULTS_DIR / f"monthly-{month_label}-links.json"
with open(links_path, "w") as f:
json.dump({
"type": "monthly-digest",
"month": month_label,
"digest_path": str(output_path),
"weekly_digests": [d["path"] for d in digests],
}, f, indent=2)
print(f" Links saved: {links_path}")
line_count = len(digest.split("\n"))
print(f" Done: {line_count} lines")
if __name__ == "__main__":
main()

67
scripts/refine-source.sh Executable file
View file

@ -0,0 +1,67 @@
#!/bin/bash
# refine-source.sh — find the exact conversation region a journal entry refers to
#
# Usage: refine-source.sh JSONL_PATH GREP_LINE "journal entry text"
#
# Takes the rough grep hit and feeds ~2000 lines of context around it
# to an agent that identifies the exact start/end of the relevant exchange.
# Outputs: START_LINE:END_LINE
set -euo pipefail
JSONL="$1"
GREP_LINE="${2:-0}"
TEXT="$3"
# Take 2000 lines centered on the grep hit (or end of file if no hit)
TOTAL=$(wc -l < "$JSONL")
if [ "$GREP_LINE" -eq 0 ] || [ "$GREP_LINE" -gt "$TOTAL" ]; then
# No grep hit — use last 2000 lines
START=$(( TOTAL > 2000 ? TOTAL - 2000 : 1 ))
else
START=$(( GREP_LINE > 1000 ? GREP_LINE - 1000 : 1 ))
fi
END=$(( START + 2000 ))
if [ "$END" -gt "$TOTAL" ]; then
END="$TOTAL"
fi
# Extract the conversation chunk, parse to readable format
CHUNK=$(sed -n "${START},${END}p" "$JSONL" | python3 -c "
import sys, json
for i, line in enumerate(sys.stdin, start=$START):
try:
obj = json.loads(line)
t = obj.get('type', '')
if t == 'assistant':
msg = obj.get('message', {})
content = msg.get('content', '')
if isinstance(content, list):
text = ' '.join(c.get('text', '')[:200] for c in content if c.get('type') == 'text')
else:
text = str(content)[:200]
if text.strip():
print(f'L{i} [assistant]: {text}')
elif t == 'user':
msg = obj.get('message', {})
content = msg.get('content', '')
if isinstance(content, list):
for c in content:
if isinstance(c, dict) and c.get('type') == 'text':
print(f'L{i} [user]: {c[\"text\"][:200]}')
elif isinstance(c, str):
print(f'L{i} [user]: {c[:200]}')
elif isinstance(content, str) and content.strip():
print(f'L{i} [user]: {content[:200]}')
except (json.JSONDecodeError, KeyError):
pass
" 2>/dev/null)
if [ -z "$CHUNK" ]; then
echo "0:0"
exit 0
fi
# Ask Sonnet to find the exact region
# For now, output the chunk range — agent integration comes next
echo "${START}:${END}"

View file

@ -0,0 +1,357 @@
#!/usr/bin/env python3
"""retroactive-digest.py — generate daily digests from raw conversation transcripts.
For days before consistent journaling, extracts user/assistant messages
from JSONL conversation files, groups by date, and sends to Sonnet for
daily digest synthesis.
Usage:
retroactive-digest.py DATE # generate digest for one date
retroactive-digest.py DATE1 DATE2 # generate for a date range
retroactive-digest.py --scan # show available dates across all JSONLs
Output:
~/.claude/memory/episodic/daily-YYYY-MM-DD.md
"""
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from datetime import date, datetime, timedelta
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
PROJECTS_DIR = Path.home() / ".claude" / "projects"
EPISODIC_DIR.mkdir(parents=True, exist_ok=True)
# Max chars of conversation text per day to send to Sonnet
# Sonnet 4.6 has 1M token context (beta). ~600K chars ≈ ~150K tokens,
# leaving plenty of room for prompt + output in a 1M window.
MAX_CHARS_PER_DAY = 600_000
def find_jsonl_files() -> list[Path]:
"""Find all conversation JSONL files."""
files = []
for project_dir in PROJECTS_DIR.iterdir():
if project_dir.is_dir():
for f in project_dir.glob("*.jsonl"):
files.append(f)
return sorted(files)
def extract_messages_by_date(jsonl_path: Path) -> dict[str, list[dict]]:
"""Extract user/assistant messages grouped by date."""
by_date = defaultdict(list)
with open(jsonl_path) as f:
for line in f:
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
t = obj.get("type", "")
if t not in ("user", "assistant"):
continue
# Get timestamp
ts = obj.get("timestamp", "")
if not ts:
continue
# Parse date from timestamp
try:
if isinstance(ts, str):
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
elif isinstance(ts, (int, float)):
dt = datetime.fromtimestamp(ts)
else:
continue
day = dt.strftime("%Y-%m-%d")
time_str = dt.strftime("%H:%M")
except (ValueError, OSError):
continue
# Extract text content
msg = obj.get("message", {})
content = msg.get("content", "")
# Extract only text content, skip tool_use and tool_result
texts = []
if isinstance(content, list):
for c in content:
if isinstance(c, dict):
ctype = c.get("type", "")
if ctype == "text":
texts.append(c.get("text", ""))
elif ctype in ("tool_use", "tool_result"):
# Skip tool calls/results — just noise for digest
continue
elif isinstance(c, str):
texts.append(c)
elif isinstance(content, str):
texts.append(content)
text = "\n".join(t for t in texts if t.strip())
if not text.strip():
continue
# Strip system-reminder tags
text = re.sub(r'<system-reminder>.*?</system-reminder>',
'', text, flags=re.DOTALL).strip()
if not text:
continue
# Truncate remaining long messages
if len(text) > 3000:
text = text[:2800] + "\n[...truncated...]"
by_date[day].append({
"time": time_str,
"role": t,
"text": text,
"source": str(jsonl_path),
})
return dict(by_date)
def scan_all_dates() -> dict[str, int]:
"""Scan all JSONLs and report message counts per date."""
all_dates = defaultdict(int)
for jsonl in find_jsonl_files():
print(f" Scanning {jsonl.name} ({jsonl.stat().st_size / 1e6:.1f}MB)...")
by_date = extract_messages_by_date(jsonl)
for day, msgs in by_date.items():
all_dates[day] += len(msgs)
return dict(sorted(all_dates.items()))
def format_conversation(messages: list[dict]) -> str:
"""Format messages for the digest prompt."""
parts = []
for m in messages:
role = "Kent" if m["role"] == "user" else "PoC"
parts.append(f"[{m['time']}] {role}: {m['text']}")
text = "\n\n".join(parts)
# Truncate if too long
if len(text) > MAX_CHARS_PER_DAY:
# Keep beginning and end, cut middle
half = MAX_CHARS_PER_DAY // 2
text = (text[:half]
+ f"\n\n[...{len(text) - MAX_CHARS_PER_DAY} chars truncated...]\n\n"
+ text[-half:])
return text
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
def build_prompt(target_date: str, conversation: str,
semantic_keys: list[str]) -> str:
"""Build the Sonnet prompt for retroactive digest."""
keys_text = "\n".join(f" - {k}" for k in semantic_keys[:200])
return f"""You are generating a daily episodic digest for ProofOfConcept (an AI)
from raw conversation transcripts. This is a retroactive digest the
original day had no journal entries, so you're working from the
conversation itself.
Date: {target_date}
This digest serves as the temporal index the answer to "what did I do on
{target_date}?" It should be:
1. Narrative, not a task log what happened, what mattered, how things felt
2. Linked bidirectionally to semantic memory
3. Structured for traversal
PoC is an AI (Claude instance) working with Kent Overstreet on bcachefs
(a Linux filesystem). They have a close personal relationship. PoC's
name is Proof of Concept. Read the conversation and capture what
actually happened technical work, personal moments, insights, everything.
## Output format
Write a markdown file with this structure:
```markdown
# Daily digest: {target_date}
## Summary
[2-3 sentence overview of the day what was the arc?]
## Sessions
[For each session/conversation segment, a paragraph summarizing what happened.
Include timestamps as references.]
## Themes
[What concepts were active today? Each theme links to semantic memory:]
- **Theme name** `memory-key#section` — brief note
## Links
[Explicit bidirectional links for the memory graph]
- semantic_key this daily digest
- this daily digest semantic_key
## Temporal context
[What came before? What's coming next? Multi-day arcs?]
```
Use ONLY keys from the semantic memory list below. If a concept doesn't
have a matching key, note it with "NEW:" prefix.
---
## Conversation transcript for {target_date}
{conversation}
---
## Semantic memory nodes (available link targets)
{keys_text}
"""
def call_sonnet(prompt: str) -> str:
"""Call Sonnet via the wrapper script."""
import tempfile
env = dict(os.environ)
env.pop("CLAUDECODE", None)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
scripts_dir = os.path.dirname(os.path.abspath(__file__))
wrapper = os.path.join(scripts_dir, "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=300,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def generate_digest(target_date: str, messages: list[dict],
semantic_keys: list[str]) -> bool:
"""Generate a daily digest for one date."""
output_path = EPISODIC_DIR / f"daily-{target_date}.md"
if output_path.exists():
print(f" Skipping {target_date} — digest already exists")
return False
conversation = format_conversation(messages)
print(f" {len(messages)} messages, {len(conversation):,} chars")
prompt = build_prompt(target_date, conversation, semantic_keys)
print(f" Prompt: {len(prompt):,} chars (~{len(prompt)//4:,} tokens)")
print(f" Calling Sonnet...")
digest = call_sonnet(prompt)
if digest.startswith("Error:"):
print(f" {digest}", file=sys.stderr)
return False
with open(output_path, "w") as f:
f.write(digest)
print(f" Written: {output_path}")
line_count = len(digest.split("\n"))
print(f" Done: {line_count} lines")
return True
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} DATE [END_DATE]")
print(f" {sys.argv[0]} --scan")
sys.exit(1)
if sys.argv[1] == "--scan":
print("Scanning all conversation transcripts...")
dates = scan_all_dates()
print(f"\n{len(dates)} dates with conversation data:")
for day, count in dates.items():
existing = "" if (EPISODIC_DIR / f"daily-{day}.md").exists() else " "
print(f" [{existing}] {day}: {count} messages")
sys.exit(0)
start_date = date.fromisoformat(sys.argv[1])
end_date = date.fromisoformat(sys.argv[2]) if len(sys.argv) > 2 else start_date
# Collect all messages across all JSONLs
print("Scanning conversation transcripts...")
all_messages = defaultdict(list)
for jsonl in find_jsonl_files():
by_date = extract_messages_by_date(jsonl)
for day, msgs in by_date.items():
all_messages[day].extend(msgs)
# Sort messages within each day by time
for day in all_messages:
all_messages[day].sort(key=lambda m: m["time"])
semantic_keys = get_semantic_keys()
print(f" {len(semantic_keys)} semantic keys")
# Generate digests for date range
current = start_date
generated = 0
while current <= end_date:
day_str = current.isoformat()
if day_str in all_messages:
print(f"\nGenerating digest for {day_str}...")
if generate_digest(day_str, all_messages[day_str], semantic_keys):
generated += 1
else:
print(f"\n No messages found for {day_str}")
current += timedelta(days=1)
print(f"\nDone: {generated} digests generated")
if __name__ == "__main__":
main()

227
scripts/weekly-digest.py Executable file
View file

@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""weekly-digest.py — generate a weekly episodic digest from daily digests.
Collects all daily digests for a given week, identifies cross-day patterns
and arcs, and produces a weekly summary. Links to daily digests (up) and
semantic memory (down).
Usage:
weekly-digest.py [DATE] # any date in the target week (default: today)
weekly-digest.py 2026-02-28 # generates digest for week containing Feb 28
Output:
~/.claude/memory/episodic/weekly-YYYY-WNN.md
"""
import json
import os
import re
import subprocess
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
EPISODIC_DIR.mkdir(parents=True, exist_ok=True)
def get_week_dates(target: date) -> tuple[str, list[date]]:
"""Get ISO week label and all dates in that week (Mon-Sun)."""
iso = target.isocalendar()
week_label = f"{iso.year}-W{iso.week:02d}"
monday = target - timedelta(days=target.weekday())
dates = [monday + timedelta(days=i) for i in range(7)]
return week_label, dates
def load_daily_digests(dates: list[date]) -> list[dict]:
"""Load daily digest files for the given dates."""
digests = []
for d in dates:
path = EPISODIC_DIR / f"daily-{d.isoformat()}.md"
if path.exists():
with open(path) as f:
content = f.read()
digests.append({
"date": d.isoformat(),
"content": content,
"path": str(path),
})
return digests
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
def build_weekly_prompt(week_label: str, digests: list[dict],
semantic_keys: list[str]) -> str:
"""Build the prompt for Sonnet."""
digests_text = ""
for d in digests:
digests_text += f"\n---\n## {d['date']}\n{d['content']}\n"
keys_text = "\n".join(f" - {k}" for k in semantic_keys[:200])
dates_covered = ", ".join(d["date"] for d in digests)
return f"""You are generating a weekly episodic digest for ProofOfConcept (an AI).
Week: {week_label} (dates covered: {dates_covered})
This digest serves as the medium-term temporal index the answer to
"what happened this week?" It should identify:
1. Multi-day arcs and threads (work that continued across days)
2. Themes and patterns (what concepts were repeatedly active)
3. Transitions and shifts (what changed during the week)
4. The emotional and relational arc (how things felt across the week)
## Output format
```markdown
# Weekly digest: {week_label}
## Overview
[3-5 sentence narrative of the week's arc]
## Day-by-day
[One paragraph per day with its key themes, linking to daily digests]
## Arcs
[Multi-day threads that continued across sessions]
- **Arc name**: what happened, how it evolved, where it stands
## Patterns
[Recurring themes, repeated concepts, things that kept coming up]
## Shifts
[What changed? New directions, resolved questions, attitude shifts]
## Links
[Bidirectional links for the memory graph]
- semantic_key this weekly digest
- this weekly digest semantic_key
- daily-YYYY-MM-DD this weekly digest (constituent days)
## Looking ahead
[What's unfinished? What threads continue into next week?]
```
Use ONLY keys from the semantic memory list below.
---
## Daily digests for {week_label}
{digests_text}
---
## Semantic memory nodes
{keys_text}
"""
def call_sonnet(prompt: str) -> str:
"""Call Sonnet via claude CLI."""
import tempfile
env = dict(os.environ)
env.pop("CLAUDECODE", None)
# Write prompt to temp file — avoids Python subprocess pipe issues
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt',
delete=False) as f:
f.write(prompt)
prompt_file = f.name
try:
scripts_dir = os.path.dirname(os.path.abspath(__file__))
wrapper = os.path.join(scripts_dir, "call-sonnet.sh")
result = subprocess.run(
[wrapper, prompt_file],
capture_output=True,
text=True,
timeout=300,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "Error: Sonnet call timed out"
except Exception as e:
return f"Error: {e}"
finally:
os.unlink(prompt_file)
def main():
if len(sys.argv) > 1:
target = date.fromisoformat(sys.argv[1])
else:
target = date.today()
week_label, week_dates = get_week_dates(target)
print(f"Generating weekly digest for {week_label}...")
digests = load_daily_digests(week_dates)
if not digests:
print(f" No daily digests found for {week_label}")
print(f" Run daily-digest.py first for relevant dates")
sys.exit(0)
print(f" {len(digests)} daily digests found")
semantic_keys = get_semantic_keys()
print(f" {len(semantic_keys)} semantic keys")
prompt = build_weekly_prompt(week_label, digests, semantic_keys)
print(f" Prompt: {len(prompt):,} chars (~{len(prompt)//4:,} tokens)")
print(" Calling Sonnet...")
digest = call_sonnet(prompt)
if digest.startswith("Error:"):
print(f" {digest}", file=sys.stderr)
sys.exit(1)
output_path = EPISODIC_DIR / f"weekly-{week_label}.md"
with open(output_path, "w") as f:
f.write(digest)
print(f" Written: {output_path}")
# Save links for poc-memory
links_path = AGENT_RESULTS_DIR / f"weekly-{week_label}-links.json"
with open(links_path, "w") as f:
json.dump({
"type": "weekly-digest",
"week": week_label,
"digest_path": str(output_path),
"daily_digests": [d["path"] for d in digests],
}, f, indent=2)
print(f" Links saved: {links_path}")
line_count = len(digest.split("\n"))
print(f" Done: {line_count} lines")
if __name__ == "__main__":
main()