scripts: use capnp store instead of reading markdown directly

Add store_helpers.py with shared helpers that call poc-memory commands
(list-keys, render, journal-tail) instead of globbing ~/.claude/memory/*.md
and parsing section headers.

All 9 Python scripts updated: get_semantic_keys(), get_topic_file_index(),
get_recent_journal(), parse_journal_entries(), read_journal_range(),
collect_topic_stems(), and file preview rendering now go through the store.

This completes the clean switch — no script reads archived markdown files.
This commit is contained in:
ProofOfConcept 2026-02-28 23:32:47 -05:00
parent f20ea4f827
commit d14710e477
10 changed files with 324 additions and 297 deletions

View file

@ -68,20 +68,22 @@ def get_unique_files(keys: list[str]) -> list[str]:
def build_prompt(files: list[str]) -> str:
"""Build categorization prompt."""
# Read first few lines of each file for context
# Read file previews from the store
file_previews = []
for f in files:
path = MEMORY_DIR / f
if not path.exists():
# Try episodic
path = MEMORY_DIR / "episodic" / f
if path.exists():
content = path.read_text()
# First 5 lines or 300 chars
preview = '\n'.join(content.split('\n')[:5])[:300]
file_previews.append(f" {f}: {preview.replace(chr(10), ' | ')}")
else:
file_previews.append(f" {f}: (file not found)")
try:
r = subprocess.run(
["poc-memory", "render", f],
capture_output=True, text=True, timeout=10
)
content = r.stdout.strip()
if content:
preview = '\n'.join(content.split('\n')[:5])[:300]
file_previews.append(f" {f}: {preview.replace(chr(10), ' | ')}")
else:
file_previews.append(f" {f}: (no content)")
except Exception:
file_previews.append(f" {f}: (render failed)")
previews_text = '\n'.join(file_previews)

View file

@ -64,88 +64,36 @@ def call_sonnet(prompt: str, timeout: int = 600) -> str:
# ---------------------------------------------------------------------------
def get_recent_journal(n_lines: int = 200) -> str:
"""Get last N lines of journal."""
journal = MEMORY_DIR / "journal.md"
if not journal.exists():
return ""
with open(journal) as f:
lines = f.readlines()
return "".join(lines[-n_lines:])
"""Get recent journal entries from the store."""
from store_helpers import get_recent_journal as _get_journal
# n_lines ≈ 50 entries (rough heuristic: ~4 lines per entry)
return _get_journal(n=max(20, n_lines // 4))
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers."""
index = {}
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
sections = []
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
sections.append(line.strip())
except Exception:
pass
index[name] = sections
return index
"""Build index of topic files and their section headers from the store."""
from store_helpers import get_topic_file_index as _get_index
return _get_index()
def get_mem_markers() -> list[dict]:
"""Extract all <!-- mem: --> markers from memory files."""
"""Get relations from the store (replaces mem marker parsing)."""
from store_helpers import get_relations
raw = get_relations()
# Parse list-edges output into marker-like dicts
markers = []
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
for line in raw.split('\n'):
line = line.strip()
if not line:
continue
try:
content = md.read_text()
for match in re.finditer(
r'<!-- mem: (.*?) -->', content):
attrs = {}
for part in match.group(1).split():
if '=' in part:
k, v = part.split('=', 1)
attrs[k] = v
attrs['_file'] = md.name
markers.append(attrs)
except Exception:
pass
markers.append({"_raw": line})
return markers
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get first N chars of each topic file for cross-link scanning."""
parts = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "search-testing.md"):
continue
try:
content = md.read_text()
# Get sections and first paragraph of each
sections = []
current_section = name
current_content = []
for line in content.split('\n'):
if line.startswith("## "):
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
current_section = line.strip()
current_content = []
elif line.strip():
current_content.append(line.strip())
if current_content:
text = '\n'.join(current_content[:5])
sections.append(f" {current_section}: {text[:200]}")
parts.append(f"\n### {name}\n" + '\n'.join(sections[:15]))
except Exception:
pass
return '\n'.join(parts)
"""Get topic file summaries from the store."""
from store_helpers import get_topic_summaries as _get_summaries
return _get_summaries(max_chars_per_file)
def get_graph_stats() -> str:
@ -274,16 +222,13 @@ def build_crosslink_prompt() -> str:
marker_text = ""
for m in markers:
f = m.get('_file', '?')
mid = m.get('id', '?')
links = m.get('links', '')
marker_text += f" {f}#{mid} → links={links}\n"
marker_text += f" {m.get('_raw', '?')}\n"
return f"""You are the Cross-Link Scanner for ProofOfConcept's memory system.
Your job: find MISSING connections between topic files.
## Existing links (from <!-- mem: --> markers)
## Existing relations (from the memory graph)
{marker_text}
@ -328,15 +273,13 @@ def build_topology_prompt() -> str:
stats = get_graph_stats()
topic_index = get_topic_file_index()
# Get node counts per file from the store
from store_helpers import get_topic_file_index as _get_index
topic_index = _get_index()
file_sizes = ""
for md in sorted(MEMORY_DIR.glob("*.md")):
if md.name in ("journal.md", "MEMORY.md"):
continue
try:
lines = len(md.read_text().split('\n'))
file_sizes += f" {md.name}: {lines} lines\n"
except Exception:
pass
for fname in sorted(topic_index.keys()):
n_sections = len(topic_index[fname])
file_sizes += f" {fname}: {n_sections} sections\n"
return f"""You are the Topology Reporter for ProofOfConcept's memory system.

View file

@ -81,17 +81,9 @@ def get_health() -> dict:
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers."""
index = {}
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
headers = []
for line in md.read_text().split('\n'):
if line.startswith('## '):
slug = re.sub(r'[^a-z0-9-]', '', line[3:].lower().replace(' ', '-'))
headers.append(slug)
index[name] = headers
return index
"""Build index of topic files and their section headers from the store."""
from store_helpers import get_topic_file_index as _get_index
return _get_index()
def get_graph_structure() -> str:
@ -123,12 +115,14 @@ def build_crosslink_prompt(round_num: int) -> str:
graph = get_graph_structure()
status = get_status()
# Read a sample of files for context
# Read a sample of files from the store
from store_helpers import render as _render
file_previews = ""
for f in sorted(MEMORY_DIR.glob("*.md"))[:30]:
content = f.read_text()
preview = '\n'.join(content.split('\n')[:8])[:400]
file_previews += f"\n--- {f.name} ---\n{preview}\n"
for fname in sorted(index.keys())[:30]:
content = _render(fname)
if content:
preview = '\n'.join(content.split('\n')[:8])[:400]
file_previews += f"\n--- {fname} ---\n{preview}\n"
return f"""You are a cross-link discovery agent (round {round_num}).
@ -210,13 +204,13 @@ Output ONLY the JSON array."""
def build_newfile_prompt(round_num: int) -> str:
"""Build prompt for connecting the new split files."""
# Read the new reflection files
# Read the new reflection files from the store
from store_helpers import render as _render
new_files = {}
for name in ['reflections-reading.md', 'reflections-dreams.md', 'reflections-zoom.md',
'verus-proofs.md']:
path = MEMORY_DIR / name
if path.exists():
content = path.read_text()
content = _render(name)
if content:
new_files[name] = content[:2000]
# Read existing files they should connect to
@ -224,9 +218,8 @@ def build_newfile_prompt(round_num: int) -> str:
for name in ['differentiation.md', 'cognitive-modes.md', 'language-theory.md',
'discoveries.md', 'inner-life.md', 'design-context-window.md',
'design-consolidate.md', 'experiments-on-self.md']:
path = MEMORY_DIR / name
if path.exists():
content = path.read_text()
content = _render(name)
if content:
target_files[name] = content[:1500]
graph = get_graph_structure()

View file

@ -55,10 +55,28 @@ def call_sonnet(prompt: str, timeout: int = 600) -> str:
def read_file(path: Path) -> str:
"""Read a file, return empty string if missing."""
"""Read a file, return empty string if missing.
Falls back to the store if the file doesn't exist on disk
(content markdown files have been archived).
"""
if path.exists():
return path.read_text()
return ""
# Try the store — the filename is the key
from store_helpers import render, list_keys
key = path.name
# Gather file-level + section content
all_keys = list_keys()
prefix = f"{key}#"
matching = [k for k in all_keys if k == key or k.startswith(prefix)]
if not matching:
return ""
parts = []
for k in matching:
content = render(k)
if content:
parts.append(content)
return "\n\n".join(parts)
def read_digest(name: str) -> str:
@ -68,25 +86,9 @@ def read_digest(name: str) -> str:
def read_journal_range(start_date: str, end_date: str) -> str:
"""Extract journal entries between two dates."""
journal = MEMORY_DIR / "journal.md"
if not journal.exists():
return ""
content = journal.read_text()
# Extract entries between dates
lines = content.split('\n')
result = []
capturing = False
for line in lines:
if line.startswith('## '):
# Check if this is a date header
if start_date <= line[3:13] <= end_date:
capturing = True
elif capturing and line[3:13] > end_date:
capturing = False
if capturing:
result.append(line)
return '\n'.join(result[-500:]) # Last 500 lines in range
"""Get journal entries between two dates from the store."""
from store_helpers import get_journal_range
return get_journal_range(start_date, end_date)
# ---------------------------------------------------------------------------
@ -382,47 +384,45 @@ def run_task(task: dict, do_apply: bool) -> dict:
result["status"] = "dry_run"
return result
# Apply the content
target_path = MEMORY_DIR / task["target"]
# Apply the content — write directly to the store
target = task["target"]
if task["action"] == "create":
if target_path.exists():
print(f" ! Target already exists: {target_path}")
result["status"] = "skipped"
return result
target_path.write_text(content + "\n")
print(f" + Created: {target_path} ({result['content_lines']} lines)")
# Write each section as a separate node
proc = subprocess.run(
["poc-memory", "write", target],
input=content, capture_output=True, text=True, timeout=30
)
print(f" + Created in store: {target} ({result['content_lines']} lines)")
if proc.stdout.strip():
print(f" {proc.stdout.strip()}")
result["status"] = "applied"
elif task["action"] == "append_section":
if not target_path.exists():
print(f" ! Target doesn't exist: {target_path}")
result["status"] = "error"
return result
existing = target_path.read_text()
# Append with separator
with open(target_path, "a") as f:
f.write("\n\n" + content + "\n")
print(f" + Appended to: {target_path} ({result['content_lines']} lines)")
# Extract section key from content (## header → slug)
header_match = re.match(r'^## (.+)', content)
if header_match:
slug = re.sub(r'[^a-z0-9-]', '',
header_match.group(1).strip().lower().replace(' ', '-'))
key = f"{target}#{slug}"
else:
key = target
proc = subprocess.run(
["poc-memory", "write", key],
input=content, capture_output=True, text=True, timeout=30
)
print(f" + Appended to store: {key} ({result['content_lines']} lines)")
if proc.stdout.strip():
print(f" {proc.stdout.strip()}")
result["status"] = "applied"
elif task["action"] == "update":
# For updates, we save the proposed changes and let the user review
output_path = AGENT_RESULTS_DIR / f"promotion-{task['target']}-{datetime.now().strftime('%Y%m%dT%H%M%S')}.md"
output_path.write_text(f"# Proposed update for {task['target']}\n\n{content}\n")
# For updates, save proposed changes for review
output_path = AGENT_RESULTS_DIR / f"promotion-{target}-{datetime.now().strftime('%Y%m%dT%H%M%S')}.md"
output_path.write_text(f"# Proposed update for {target}\n\n{content}\n")
print(f" ~ Saved proposed update: {output_path}")
result["status"] = "proposed"
# Register new content with poc-memory
if result["status"] == "applied":
try:
subprocess.run(
["poc-memory", "init"],
capture_output=True, text=True, timeout=30
)
except Exception:
pass # Non-critical
return result

View file

@ -23,7 +23,6 @@ from datetime import date, datetime
from pathlib import Path
MEMORY_DIR = Path.home() / ".claude" / "memory"
JOURNAL = MEMORY_DIR / "journal.md"
EPISODIC_DIR = MEMORY_DIR / "episodic"
AGENT_RESULTS_DIR = MEMORY_DIR / "agent-results"
@ -31,41 +30,9 @@ EPISODIC_DIR.mkdir(parents=True, exist_ok=True)
def parse_journal_entries(target_date: str) -> list[dict]:
"""Extract all journal entries for a given date (YYYY-MM-DD)."""
entries = []
current = None
with open(JOURNAL) as f:
for line in f:
# Match entry header: ## 2026-02-28T19:42
m = re.match(r'^## (\d{4}-\d{2}-\d{2})T(\d{2}:\d{2})', line)
if m:
if current is not None:
entries.append(current)
entry_date = m.group(1)
entry_time = m.group(2)
current = {
"date": entry_date,
"time": entry_time,
"timestamp": f"{entry_date}T{entry_time}",
"source_ref": None,
"text": "",
}
continue
if current is not None:
# Check for source comment
sm = re.match(r'<!-- source: (.+?) -->', line)
if sm:
current["source_ref"] = sm.group(1)
continue
current["text"] += line
if current is not None:
entries.append(current)
# Filter to target date
return [e for e in entries if e["date"] == target_date]
"""Get journal entries for a given date from the store."""
from store_helpers import get_journal_entries_by_date
return get_journal_entries_by_date(target_date)
def load_agent_results(target_date: str) -> list[dict]:
@ -90,24 +57,9 @@ def load_agent_results(target_date: str) -> list[dict]:
def get_semantic_keys() -> list[str]:
"""Get all semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
"""Get semantic memory keys from the store."""
from store_helpers import get_semantic_keys as _get_keys
return _get_keys()
def build_digest_prompt(target_date: str, entries: list[dict],

View file

@ -108,26 +108,9 @@ def get_memory_nodes() -> str:
def get_semantic_keys() -> list[str]:
"""Get all semantic memory file keys by scanning the memory dir."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "work-state",
"where-am-i.md", "MEMORY.md"):
continue
keys.append(name)
# Scan for section headers
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
"""Get all semantic memory keys from the store."""
from store_helpers import get_semantic_keys as _get_keys
return _get_keys()
def build_prompt(entry_text: str, conversation: str,

View file

@ -56,24 +56,9 @@ def load_weekly_digests(week_labels: list[str]) -> list[dict]:
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
"""Get semantic memory keys from the store."""
from store_helpers import get_semantic_keys as _get_keys
return _get_keys()
def build_monthly_prompt(month_label: str, digests: list[dict],

View file

@ -155,24 +155,9 @@ def format_conversation(messages: list[dict]) -> str:
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
"""Get semantic memory keys from the store."""
from store_helpers import get_semantic_keys as _get_keys
return _get_keys()
def build_prompt(target_date: str, conversation: str,

199
scripts/store_helpers.py Normal file
View file

@ -0,0 +1,199 @@
"""store_helpers.py — shared helpers for scripts using the capnp store.
All memory content lives in the capnp store (poc-memory). These helpers
replace the old pattern of globbing ~/.claude/memory/*.md and parsing
section headers directly.
"""
import re
import subprocess
from functools import lru_cache
def _run_poc(args: list[str], timeout: int = 30) -> str:
"""Run a poc-memory command and return stdout."""
try:
result = subprocess.run(
["poc-memory"] + args,
capture_output=True, text=True, timeout=timeout
)
return result.stdout.strip()
except Exception:
return ""
def list_keys() -> list[str]:
"""Get all memory node keys from the store."""
output = _run_poc(["list-keys"])
return [k.strip() for k in output.split('\n') if k.strip()]
def get_semantic_keys() -> list[str]:
"""Get semantic memory keys (excludes journal, system files)."""
keys = list_keys()
return [k for k in keys
if not k.startswith("journal.md#")
and k not in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md", "work-state")]
def get_journal_keys(n: int = 0) -> list[str]:
"""Get journal entry keys, sorted by date (newest first).
If n > 0, return only the last n entries.
"""
keys = [k for k in list_keys() if k.startswith("journal.md#")]
# Sort by date embedded in key (journal.md#j-2026-02-28t23-19-slug)
keys.sort(reverse=True)
return keys[:n] if n > 0 else keys
def render(key: str) -> str:
"""Render a single node's content."""
return _run_poc(["render", key])
def get_recent_journal(n: int = 50) -> str:
"""Get recent journal entries as text (replaces reading journal.md)."""
keys = get_journal_keys(n)
parts = []
for key in reversed(keys): # oldest first
content = render(key)
if content:
parts.append(content)
return "\n\n".join(parts)
def get_journal_entries_by_date(target_date: str) -> list[dict]:
"""Get journal entries for a specific date (YYYY-MM-DD).
Returns list of dicts with 'key', 'timestamp', 'text', 'source_ref'.
"""
keys = get_journal_keys()
entries = []
for key in keys:
# Extract date from key: journal.md#j-2026-02-28t23-19-slug
m = re.search(r'j-(\d{4}-\d{2}-\d{2})t(\d{2})-(\d{2})', key)
if not m:
# Try extracting from unnamed keys by rendering
content = render(key)
m2 = re.match(r'^## (\d{4}-\d{2}-\d{2})T(\d{2}):(\d{2})', content)
if not m2 or m2.group(1) != target_date:
continue
entry_date = m2.group(1)
entry_time = f"{m2.group(2)}:{m2.group(3)}"
else:
entry_date = m.group(1)
entry_time = f"{m.group(2)}:{m.group(3)}"
if entry_date != target_date:
continue
content = render(key)
# Parse source ref from content
source_ref = None
sm = re.search(r'<!-- source: (.+?) -->', content)
if sm:
source_ref = sm.group(1)
# Strip the header line
text = re.sub(r'^## \d{4}-\d{2}-\d{2}T\d{2}:\d{2}\s*\n?', '', content)
entries.append({
"key": key,
"date": entry_date,
"time": entry_time,
"timestamp": f"{entry_date}T{entry_time}",
"text": text.strip(),
"source_ref": source_ref,
})
return entries
def get_topic_file_index() -> dict[str, list[str]]:
"""Build index of topic files and their section headers.
Returns {filename: [section_headers]}.
"""
keys = get_semantic_keys()
index: dict[str, list[str]] = {}
for key in keys:
if '#' in key:
filename, section = key.split('#', 1)
if filename not in index:
index[filename] = []
index[filename].append(f"## {section}")
else:
if key not in index:
index[key] = []
return index
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
"""Get summaries of topic file content."""
index = get_topic_file_index()
parts = []
for filename in sorted(index.keys()):
if filename in ("journal.md", "MEMORY.md", "where-am-i.md",
"work-queue.md"):
continue
# Render file-level node
content = render(filename)
if not content:
continue
# Truncate
if len(content) > max_chars_per_file:
content = content[:max_chars_per_file] + "\n[...truncated...]"
parts.append(f"\n### {filename}\n{content}")
return '\n'.join(parts)
def get_relations() -> str:
"""Get all relations (replaces mem marker parsing)."""
return _run_poc(["list-edges"])
def get_graph_stats() -> str:
"""Get graph statistics."""
parts = []
status = _run_poc(["status"])
if status:
parts.append(f"=== poc-memory status ===\n{status}")
graph = _run_poc(["graph"])
if graph:
lines = graph.split('\n')[:150]
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
+ '\n'.join(lines))
return '\n'.join(parts)
def get_journal_range(start_date: str, end_date: str) -> str:
"""Get journal entries between two dates."""
keys = get_journal_keys()
parts = []
for key in reversed(keys): # oldest first
m = re.search(r'j-(\d{4}-\d{2}-\d{2})', key)
if not m:
continue
entry_date = m.group(1)
if start_date <= entry_date <= end_date:
content = render(key)
if content:
parts.append(content)
text = "\n\n".join(parts)
# Cap at ~500 lines
lines = text.split('\n')
if len(lines) > 500:
text = '\n'.join(lines[-500:])
return text

View file

@ -54,24 +54,9 @@ def load_daily_digests(dates: list[date]) -> list[dict]:
def get_semantic_keys() -> list[str]:
"""Get semantic memory file keys."""
keys = []
for md in sorted(MEMORY_DIR.glob("*.md")):
name = md.name
if name in ("journal.md", "work-queue.md", "MEMORY.md"):
continue
keys.append(name)
try:
with open(md) as f:
for line in f:
if line.startswith("## "):
slug = re.sub(r'[^a-z0-9-]', '',
line[3:].strip().lower().replace(' ', '-'))
if slug:
keys.append(f"{name}#{slug}")
except Exception:
pass
return keys
"""Get semantic memory keys from the store."""
from store_helpers import get_semantic_keys as _get_keys
return _get_keys()
def build_weekly_prompt(week_label: str, digests: list[dict],