spectral decomposition, search improvements, char boundary fix

- New spectral module: Laplacian eigendecomposition of the memory graph.
  Commands: spectral, spectral-save, spectral-neighbors, spectral-positions,
  spectral-suggest. Spectral neighbors expand search results beyond keyword
  matching to structural proximity.

- Search: use StoreView trait to avoid 6MB state.bin rewrite on every query.
  Append-only retrieval logging. Spectral expansion shows structurally
  nearby nodes after text results.

- Fix panic in journal-tail: string truncation at byte 67 could land inside
  a multi-byte character (em dash). Now walks back to char boundary.

- Replay queue: show classification and spectral outlier score.

- Knowledge agents: extractor, challenger, connector prompts and runner
  scripts for automated graph enrichment.

- memory-search hook: stale state file cleanup (24h expiry).
This commit is contained in:
ProofOfConcept 2026-03-03 01:33:31 -05:00
parent 94dbca6018
commit 71e6f15d82
16 changed files with 3600 additions and 103 deletions

333
scripts/fact-mine.py Executable file
View file

@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""fact-mine.py — extract atomic factual claims from conversation transcripts.
Phase 1 of the fact-mining pipeline (see design/fact-mining-pipeline.md).
Usage:
fact-mine.py <jsonl_path> # mine one transcript
fact-mine.py --batch <directory> # mine all .jsonl in directory
fact-mine.py --dry-run <jsonl_path> # show chunks, don't call model
Output: JSON array of facts to stdout.
Each fact:
{
"claim": "bch2_trans_begin() sets up the transaction restart point",
"domain": "bcachefs/transaction",
"confidence": "stated",
"speaker": "Kent",
"source_line": 42,
"source_file": "c685c2a2-...jsonl"
}
"""
import json
import os
import re
import subprocess
import sys
import hashlib
from pathlib import Path
# Rough token estimate: 1 token ≈ 4 chars for English text
CHARS_PER_TOKEN = 4
WINDOW_TOKENS = 2000
OVERLAP_TOKENS = 200
WINDOW_CHARS = WINDOW_TOKENS * CHARS_PER_TOKEN
OVERLAP_CHARS = OVERLAP_TOKENS * CHARS_PER_TOKEN
EXTRACTION_PROMPT = """Extract atomic factual claims from this conversation excerpt.
Each claim should be:
- A single verifiable statement
- Specific enough to be useful in isolation
- Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal,
bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences,
linux/kernel, memory/design, identity/personal)
- Tagged with confidence: "stated" (explicitly said), "implied" (logically follows),
or "speculative" (hypothesis, not confirmed)
- Include which speaker said it (Kent, PoC/ProofOfConcept, or Unknown)
Do NOT extract:
- Opinions or subjective assessments
- Conversational filler or greetings
- Things that are obviously common knowledge
- Restatements of the same fact (pick the clearest version)
- System messages, tool outputs, or error logs (extract what was LEARNED from them)
- Anything about the conversation itself ("Kent and PoC discussed...")
Output as a JSON array. Each element:
{
"claim": "the exact factual statement",
"domain": "category/subcategory",
"confidence": "stated|implied|speculative",
"speaker": "Kent|PoC|Unknown"
}
If the excerpt contains no extractable facts, output an empty array: []
--- CONVERSATION EXCERPT ---
"""
def extract_conversation(jsonl_path: str) -> list[dict]:
"""Extract user/assistant text messages from a JSONL transcript.
Returns list of dicts: {line, role, text, timestamp}
"""
messages = []
with open(jsonl_path) as f:
for i, line in enumerate(f, 1):
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
msg_type = obj.get("type", "")
if msg_type not in ("user", "assistant"):
continue
timestamp = obj.get("timestamp", "")
msg = obj.get("message", obj)
content = msg.get("content")
if isinstance(content, str):
text = content
elif isinstance(content, list):
# Extract text blocks only (skip tool_use, tool_result, thinking)
texts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
t = block.get("text", "")
# Skip system reminders
if "<system-reminder>" in t:
continue
texts.append(t)
elif isinstance(block, str):
texts.append(block)
text = "\n".join(texts)
else:
continue
text = text.strip()
if not text:
continue
# Skip very short messages (likely just acknowledgments)
if len(text) < 20:
continue
role = "Kent" if msg_type == "user" else "PoC"
messages.append({
"line": i,
"role": role,
"text": text,
"timestamp": timestamp,
})
return messages
def format_for_extraction(messages: list[dict]) -> str:
"""Format messages into a single text for chunking."""
parts = []
for msg in messages:
# Truncate very long individual messages (tool outputs, code dumps)
text = msg["text"]
if len(text) > 3000:
text = text[:2800] + "\n[...truncated...]"
ts = msg["timestamp"][:19] if msg["timestamp"] else ""
prefix = f"[{msg['role']}]" if not ts else f"[{msg['role']} {ts}]"
parts.append(f"{prefix} {text}")
return "\n\n".join(parts)
def chunk_text(text: str) -> list[tuple[int, str]]:
"""Split text into overlapping windows.
Returns list of (start_char_offset, chunk_text).
"""
chunks = []
start = 0
while start < len(text):
end = start + WINDOW_CHARS
chunk = text[start:end]
# Try to break at a paragraph boundary
if end < len(text):
last_para = chunk.rfind("\n\n")
if last_para > WINDOW_CHARS // 2:
chunk = chunk[:last_para]
end = start + last_para
chunks.append((start, chunk))
start = end - OVERLAP_CHARS
if start <= chunks[-1][0]:
# Avoid infinite loop on very small overlap
start = end
return chunks
def call_haiku(prompt: str, timeout_secs: int = 60) -> str:
"""Call Haiku via claude CLI."""
tmp = Path(f"/tmp/fact-mine-{os.getpid()}.txt")
tmp.write_text(prompt)
try:
env = os.environ.copy()
env.pop("CLAUDECODE", None)
result = subprocess.run(
["claude", "-p", "--model", "haiku", "--tools", ""],
stdin=open(tmp),
capture_output=True,
text=True,
timeout=timeout_secs,
env=env,
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
print(f" [timeout after {timeout_secs}s]", file=sys.stderr)
return "[]"
except Exception as e:
print(f" [error: {e}]", file=sys.stderr)
return "[]"
finally:
tmp.unlink(missing_ok=True)
def parse_facts(response: str) -> list[dict]:
"""Parse JSON facts from model response."""
# Try to find JSON array in response
# Model might wrap it in markdown code blocks
response = response.strip()
# Strip markdown code block
if response.startswith("```"):
lines = response.split("\n")
lines = [l for l in lines if not l.startswith("```")]
response = "\n".join(lines)
# Find the JSON array
start = response.find("[")
end = response.rfind("]")
if start == -1 or end == -1:
return []
try:
facts = json.loads(response[start:end + 1])
if not isinstance(facts, list):
return []
return facts
except json.JSONDecodeError:
return []
def mine_transcript(jsonl_path: str, dry_run: bool = False) -> list[dict]:
"""Mine a single transcript for atomic facts."""
filename = os.path.basename(jsonl_path)
print(f"Mining: {filename}", file=sys.stderr)
messages = extract_conversation(jsonl_path)
if not messages:
print(f" No messages found", file=sys.stderr)
return []
print(f" {len(messages)} messages extracted", file=sys.stderr)
text = format_for_extraction(messages)
chunks = chunk_text(text)
print(f" {len(chunks)} chunks ({len(text)} chars)", file=sys.stderr)
if dry_run:
for i, (offset, chunk) in enumerate(chunks):
print(f"\n--- Chunk {i+1} (offset {offset}, {len(chunk)} chars) ---")
print(chunk[:500])
if len(chunk) > 500:
print(f" ... ({len(chunk) - 500} more chars)")
return []
all_facts = []
for i, (offset, chunk) in enumerate(chunks):
print(f" Chunk {i+1}/{len(chunks)} ({len(chunk)} chars)...",
file=sys.stderr, end="", flush=True)
prompt = EXTRACTION_PROMPT + chunk
response = call_haiku(prompt)
facts = parse_facts(response)
# Annotate with source info
for fact in facts:
fact["source_file"] = filename
fact["source_chunk"] = i + 1
fact["source_offset"] = offset
all_facts.extend(facts)
print(f" {len(facts)} facts", file=sys.stderr)
# Deduplicate by claim text (case-insensitive)
seen = set()
unique_facts = []
for fact in all_facts:
claim_key = fact.get("claim", "").lower().strip()
if claim_key and claim_key not in seen:
seen.add(claim_key)
unique_facts.append(fact)
print(f" Total: {len(unique_facts)} unique facts "
f"({len(all_facts) - len(unique_facts)} duplicates removed)",
file=sys.stderr)
return unique_facts
def main():
import argparse
parser = argparse.ArgumentParser(description="Extract atomic facts from conversations")
parser.add_argument("path", help="JSONL file or directory (with --batch)")
parser.add_argument("--batch", action="store_true",
help="Process all .jsonl files in directory")
parser.add_argument("--dry-run", action="store_true",
help="Show chunks without calling model")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--min-messages", type=int, default=10,
help="Skip transcripts with fewer messages (default: 10)")
args = parser.parse_args()
if args.batch:
jsonl_dir = Path(args.path)
if not jsonl_dir.is_dir():
print(f"Not a directory: {args.path}", file=sys.stderr)
sys.exit(1)
files = sorted(jsonl_dir.glob("*.jsonl"))
print(f"Found {len(files)} transcripts", file=sys.stderr)
else:
files = [Path(args.path)]
all_facts = []
for f in files:
# Quick check: skip tiny files
messages = extract_conversation(str(f))
if len(messages) < args.min_messages:
print(f"Skipping {f.name} ({len(messages)} messages < {args.min_messages})",
file=sys.stderr)
continue
facts = mine_transcript(str(f), dry_run=args.dry_run)
all_facts.extend(facts)
if not args.dry_run:
output = json.dumps(all_facts, indent=2)
if args.output:
Path(args.output).write_text(output)
print(f"\nWrote {len(all_facts)} facts to {args.output}", file=sys.stderr)
else:
print(output)
print(f"\nTotal: {len(all_facts)} facts from {len(files)} transcripts",
file=sys.stderr)
if __name__ == "__main__":
main()