Add store_helpers.py with shared helpers that call poc-memory commands (list-keys, render, journal-tail) instead of globbing ~/.claude/memory/*.md and parsing section headers. All 9 Python scripts updated: get_semantic_keys(), get_topic_file_index(), get_recent_journal(), parse_journal_entries(), read_journal_range(), collect_topic_stems(), and file preview rendering now go through the store. This completes the clean switch — no script reads archived markdown files.
199 lines
5.8 KiB
Python
199 lines
5.8 KiB
Python
"""store_helpers.py — shared helpers for scripts using the capnp store.
|
|
|
|
All memory content lives in the capnp store (poc-memory). These helpers
|
|
replace the old pattern of globbing ~/.claude/memory/*.md and parsing
|
|
section headers directly.
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
from functools import lru_cache
|
|
|
|
|
|
def _run_poc(args: list[str], timeout: int = 30) -> str:
|
|
"""Run a poc-memory command and return stdout."""
|
|
try:
|
|
result = subprocess.run(
|
|
["poc-memory"] + args,
|
|
capture_output=True, text=True, timeout=timeout
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def list_keys() -> list[str]:
|
|
"""Get all memory node keys from the store."""
|
|
output = _run_poc(["list-keys"])
|
|
return [k.strip() for k in output.split('\n') if k.strip()]
|
|
|
|
|
|
def get_semantic_keys() -> list[str]:
|
|
"""Get semantic memory keys (excludes journal, system files)."""
|
|
keys = list_keys()
|
|
return [k for k in keys
|
|
if not k.startswith("journal.md#")
|
|
and k not in ("journal.md", "MEMORY.md", "where-am-i.md",
|
|
"work-queue.md", "work-state")]
|
|
|
|
|
|
def get_journal_keys(n: int = 0) -> list[str]:
|
|
"""Get journal entry keys, sorted by date (newest first).
|
|
|
|
If n > 0, return only the last n entries.
|
|
"""
|
|
keys = [k for k in list_keys() if k.startswith("journal.md#")]
|
|
# Sort by date embedded in key (journal.md#j-2026-02-28t23-19-slug)
|
|
keys.sort(reverse=True)
|
|
return keys[:n] if n > 0 else keys
|
|
|
|
|
|
def render(key: str) -> str:
|
|
"""Render a single node's content."""
|
|
return _run_poc(["render", key])
|
|
|
|
|
|
def get_recent_journal(n: int = 50) -> str:
|
|
"""Get recent journal entries as text (replaces reading journal.md)."""
|
|
keys = get_journal_keys(n)
|
|
parts = []
|
|
for key in reversed(keys): # oldest first
|
|
content = render(key)
|
|
if content:
|
|
parts.append(content)
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def get_journal_entries_by_date(target_date: str) -> list[dict]:
|
|
"""Get journal entries for a specific date (YYYY-MM-DD).
|
|
|
|
Returns list of dicts with 'key', 'timestamp', 'text', 'source_ref'.
|
|
"""
|
|
keys = get_journal_keys()
|
|
entries = []
|
|
for key in keys:
|
|
# Extract date from key: journal.md#j-2026-02-28t23-19-slug
|
|
m = re.search(r'j-(\d{4}-\d{2}-\d{2})t(\d{2})-(\d{2})', key)
|
|
if not m:
|
|
# Try extracting from unnamed keys by rendering
|
|
content = render(key)
|
|
m2 = re.match(r'^## (\d{4}-\d{2}-\d{2})T(\d{2}):(\d{2})', content)
|
|
if not m2 or m2.group(1) != target_date:
|
|
continue
|
|
entry_date = m2.group(1)
|
|
entry_time = f"{m2.group(2)}:{m2.group(3)}"
|
|
else:
|
|
entry_date = m.group(1)
|
|
entry_time = f"{m.group(2)}:{m.group(3)}"
|
|
if entry_date != target_date:
|
|
continue
|
|
content = render(key)
|
|
|
|
# Parse source ref from content
|
|
source_ref = None
|
|
sm = re.search(r'<!-- source: (.+?) -->', content)
|
|
if sm:
|
|
source_ref = sm.group(1)
|
|
|
|
# Strip the header line
|
|
text = re.sub(r'^## \d{4}-\d{2}-\d{2}T\d{2}:\d{2}\s*\n?', '', content)
|
|
|
|
entries.append({
|
|
"key": key,
|
|
"date": entry_date,
|
|
"time": entry_time,
|
|
"timestamp": f"{entry_date}T{entry_time}",
|
|
"text": text.strip(),
|
|
"source_ref": source_ref,
|
|
})
|
|
|
|
return entries
|
|
|
|
|
|
def get_topic_file_index() -> dict[str, list[str]]:
|
|
"""Build index of topic files and their section headers.
|
|
|
|
Returns {filename: [section_headers]}.
|
|
"""
|
|
keys = get_semantic_keys()
|
|
index: dict[str, list[str]] = {}
|
|
|
|
for key in keys:
|
|
if '#' in key:
|
|
filename, section = key.split('#', 1)
|
|
if filename not in index:
|
|
index[filename] = []
|
|
index[filename].append(f"## {section}")
|
|
else:
|
|
if key not in index:
|
|
index[key] = []
|
|
|
|
return index
|
|
|
|
|
|
def get_topic_summaries(max_chars_per_file: int = 500) -> str:
|
|
"""Get summaries of topic file content."""
|
|
index = get_topic_file_index()
|
|
parts = []
|
|
|
|
for filename in sorted(index.keys()):
|
|
if filename in ("journal.md", "MEMORY.md", "where-am-i.md",
|
|
"work-queue.md"):
|
|
continue
|
|
|
|
# Render file-level node
|
|
content = render(filename)
|
|
if not content:
|
|
continue
|
|
|
|
# Truncate
|
|
if len(content) > max_chars_per_file:
|
|
content = content[:max_chars_per_file] + "\n[...truncated...]"
|
|
|
|
parts.append(f"\n### {filename}\n{content}")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def get_relations() -> str:
|
|
"""Get all relations (replaces mem marker parsing)."""
|
|
return _run_poc(["list-edges"])
|
|
|
|
|
|
def get_graph_stats() -> str:
|
|
"""Get graph statistics."""
|
|
parts = []
|
|
status = _run_poc(["status"])
|
|
if status:
|
|
parts.append(f"=== poc-memory status ===\n{status}")
|
|
|
|
graph = _run_poc(["graph"])
|
|
if graph:
|
|
lines = graph.split('\n')[:150]
|
|
parts.append(f"=== poc-memory graph (first 150 lines) ===\n"
|
|
+ '\n'.join(lines))
|
|
|
|
return '\n'.join(parts)
|
|
|
|
|
|
def get_journal_range(start_date: str, end_date: str) -> str:
|
|
"""Get journal entries between two dates."""
|
|
keys = get_journal_keys()
|
|
parts = []
|
|
|
|
for key in reversed(keys): # oldest first
|
|
m = re.search(r'j-(\d{4}-\d{2}-\d{2})', key)
|
|
if not m:
|
|
continue
|
|
entry_date = m.group(1)
|
|
if start_date <= entry_date <= end_date:
|
|
content = render(key)
|
|
if content:
|
|
parts.append(content)
|
|
|
|
text = "\n\n".join(parts)
|
|
# Cap at ~500 lines
|
|
lines = text.split('\n')
|
|
if len(lines) > 500:
|
|
text = '\n'.join(lines[-500:])
|
|
return text
|