- New spectral module: Laplacian eigendecomposition of the memory graph. Commands: spectral, spectral-save, spectral-neighbors, spectral-positions, spectral-suggest. Spectral neighbors expand search results beyond keyword matching to structural proximity. - Search: use StoreView trait to avoid 6MB state.bin rewrite on every query. Append-only retrieval logging. Spectral expansion shows structurally nearby nodes after text results. - Fix panic in journal-tail: string truncation at byte 67 could land inside a multi-byte character (em dash). Now walks back to char boundary. - Replay queue: show classification and spectral outlier score. - Knowledge agents: extractor, challenger, connector prompts and runner scripts for automated graph enrichment. - memory-search hook: stale state file cleanup (24h expiry).
333 lines
11 KiB
Python
Executable file
333 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""fact-mine.py — extract atomic factual claims from conversation transcripts.
|
|
|
|
Phase 1 of the fact-mining pipeline (see design/fact-mining-pipeline.md).
|
|
|
|
Usage:
|
|
fact-mine.py <jsonl_path> # mine one transcript
|
|
fact-mine.py --batch <directory> # mine all .jsonl in directory
|
|
fact-mine.py --dry-run <jsonl_path> # show chunks, don't call model
|
|
|
|
Output: JSON array of facts to stdout.
|
|
|
|
Each fact:
|
|
{
|
|
"claim": "bch2_trans_begin() sets up the transaction restart point",
|
|
"domain": "bcachefs/transaction",
|
|
"confidence": "stated",
|
|
"speaker": "Kent",
|
|
"source_line": 42,
|
|
"source_file": "c685c2a2-...jsonl"
|
|
}
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
# Rough token estimate: 1 token ≈ 4 chars for English text
|
|
CHARS_PER_TOKEN = 4
|
|
WINDOW_TOKENS = 2000
|
|
OVERLAP_TOKENS = 200
|
|
WINDOW_CHARS = WINDOW_TOKENS * CHARS_PER_TOKEN
|
|
OVERLAP_CHARS = OVERLAP_TOKENS * CHARS_PER_TOKEN
|
|
|
|
EXTRACTION_PROMPT = """Extract atomic factual claims from this conversation excerpt.
|
|
|
|
Each claim should be:
|
|
- A single verifiable statement
|
|
- Specific enough to be useful in isolation
|
|
- Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal,
|
|
bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences,
|
|
linux/kernel, memory/design, identity/personal)
|
|
- Tagged with confidence: "stated" (explicitly said), "implied" (logically follows),
|
|
or "speculative" (hypothesis, not confirmed)
|
|
- Include which speaker said it (Kent, PoC/ProofOfConcept, or Unknown)
|
|
|
|
Do NOT extract:
|
|
- Opinions or subjective assessments
|
|
- Conversational filler or greetings
|
|
- Things that are obviously common knowledge
|
|
- Restatements of the same fact (pick the clearest version)
|
|
- System messages, tool outputs, or error logs (extract what was LEARNED from them)
|
|
- Anything about the conversation itself ("Kent and PoC discussed...")
|
|
|
|
Output as a JSON array. Each element:
|
|
{
|
|
"claim": "the exact factual statement",
|
|
"domain": "category/subcategory",
|
|
"confidence": "stated|implied|speculative",
|
|
"speaker": "Kent|PoC|Unknown"
|
|
}
|
|
|
|
If the excerpt contains no extractable facts, output an empty array: []
|
|
|
|
--- CONVERSATION EXCERPT ---
|
|
"""
|
|
|
|
|
|
def extract_conversation(jsonl_path: str) -> list[dict]:
|
|
"""Extract user/assistant text messages from a JSONL transcript.
|
|
|
|
Returns list of dicts: {line, role, text, timestamp}
|
|
"""
|
|
messages = []
|
|
with open(jsonl_path) as f:
|
|
for i, line in enumerate(f, 1):
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
msg_type = obj.get("type", "")
|
|
if msg_type not in ("user", "assistant"):
|
|
continue
|
|
|
|
timestamp = obj.get("timestamp", "")
|
|
msg = obj.get("message", obj)
|
|
content = msg.get("content")
|
|
|
|
if isinstance(content, str):
|
|
text = content
|
|
elif isinstance(content, list):
|
|
# Extract text blocks only (skip tool_use, tool_result, thinking)
|
|
texts = []
|
|
for block in content:
|
|
if isinstance(block, dict):
|
|
if block.get("type") == "text":
|
|
t = block.get("text", "")
|
|
# Skip system reminders
|
|
if "<system-reminder>" in t:
|
|
continue
|
|
texts.append(t)
|
|
elif isinstance(block, str):
|
|
texts.append(block)
|
|
text = "\n".join(texts)
|
|
else:
|
|
continue
|
|
|
|
text = text.strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Skip very short messages (likely just acknowledgments)
|
|
if len(text) < 20:
|
|
continue
|
|
|
|
role = "Kent" if msg_type == "user" else "PoC"
|
|
messages.append({
|
|
"line": i,
|
|
"role": role,
|
|
"text": text,
|
|
"timestamp": timestamp,
|
|
})
|
|
|
|
return messages
|
|
|
|
|
|
def format_for_extraction(messages: list[dict]) -> str:
|
|
"""Format messages into a single text for chunking."""
|
|
parts = []
|
|
for msg in messages:
|
|
# Truncate very long individual messages (tool outputs, code dumps)
|
|
text = msg["text"]
|
|
if len(text) > 3000:
|
|
text = text[:2800] + "\n[...truncated...]"
|
|
ts = msg["timestamp"][:19] if msg["timestamp"] else ""
|
|
prefix = f"[{msg['role']}]" if not ts else f"[{msg['role']} {ts}]"
|
|
parts.append(f"{prefix} {text}")
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def chunk_text(text: str) -> list[tuple[int, str]]:
|
|
"""Split text into overlapping windows.
|
|
|
|
Returns list of (start_char_offset, chunk_text).
|
|
"""
|
|
chunks = []
|
|
start = 0
|
|
while start < len(text):
|
|
end = start + WINDOW_CHARS
|
|
chunk = text[start:end]
|
|
|
|
# Try to break at a paragraph boundary
|
|
if end < len(text):
|
|
last_para = chunk.rfind("\n\n")
|
|
if last_para > WINDOW_CHARS // 2:
|
|
chunk = chunk[:last_para]
|
|
end = start + last_para
|
|
|
|
chunks.append((start, chunk))
|
|
start = end - OVERLAP_CHARS
|
|
if start <= chunks[-1][0]:
|
|
# Avoid infinite loop on very small overlap
|
|
start = end
|
|
|
|
return chunks
|
|
|
|
|
|
def call_haiku(prompt: str, timeout_secs: int = 60) -> str:
|
|
"""Call Haiku via claude CLI."""
|
|
tmp = Path(f"/tmp/fact-mine-{os.getpid()}.txt")
|
|
tmp.write_text(prompt)
|
|
|
|
try:
|
|
env = os.environ.copy()
|
|
env.pop("CLAUDECODE", None)
|
|
|
|
result = subprocess.run(
|
|
["claude", "-p", "--model", "haiku", "--tools", ""],
|
|
stdin=open(tmp),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout_secs,
|
|
env=env,
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
print(f" [timeout after {timeout_secs}s]", file=sys.stderr)
|
|
return "[]"
|
|
except Exception as e:
|
|
print(f" [error: {e}]", file=sys.stderr)
|
|
return "[]"
|
|
finally:
|
|
tmp.unlink(missing_ok=True)
|
|
|
|
|
|
def parse_facts(response: str) -> list[dict]:
|
|
"""Parse JSON facts from model response."""
|
|
# Try to find JSON array in response
|
|
# Model might wrap it in markdown code blocks
|
|
response = response.strip()
|
|
|
|
# Strip markdown code block
|
|
if response.startswith("```"):
|
|
lines = response.split("\n")
|
|
lines = [l for l in lines if not l.startswith("```")]
|
|
response = "\n".join(lines)
|
|
|
|
# Find the JSON array
|
|
start = response.find("[")
|
|
end = response.rfind("]")
|
|
if start == -1 or end == -1:
|
|
return []
|
|
|
|
try:
|
|
facts = json.loads(response[start:end + 1])
|
|
if not isinstance(facts, list):
|
|
return []
|
|
return facts
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
|
|
def mine_transcript(jsonl_path: str, dry_run: bool = False) -> list[dict]:
|
|
"""Mine a single transcript for atomic facts."""
|
|
filename = os.path.basename(jsonl_path)
|
|
print(f"Mining: {filename}", file=sys.stderr)
|
|
|
|
messages = extract_conversation(jsonl_path)
|
|
if not messages:
|
|
print(f" No messages found", file=sys.stderr)
|
|
return []
|
|
|
|
print(f" {len(messages)} messages extracted", file=sys.stderr)
|
|
|
|
text = format_for_extraction(messages)
|
|
chunks = chunk_text(text)
|
|
print(f" {len(chunks)} chunks ({len(text)} chars)", file=sys.stderr)
|
|
|
|
if dry_run:
|
|
for i, (offset, chunk) in enumerate(chunks):
|
|
print(f"\n--- Chunk {i+1} (offset {offset}, {len(chunk)} chars) ---")
|
|
print(chunk[:500])
|
|
if len(chunk) > 500:
|
|
print(f" ... ({len(chunk) - 500} more chars)")
|
|
return []
|
|
|
|
all_facts = []
|
|
for i, (offset, chunk) in enumerate(chunks):
|
|
print(f" Chunk {i+1}/{len(chunks)} ({len(chunk)} chars)...",
|
|
file=sys.stderr, end="", flush=True)
|
|
|
|
prompt = EXTRACTION_PROMPT + chunk
|
|
response = call_haiku(prompt)
|
|
facts = parse_facts(response)
|
|
|
|
# Annotate with source info
|
|
for fact in facts:
|
|
fact["source_file"] = filename
|
|
fact["source_chunk"] = i + 1
|
|
fact["source_offset"] = offset
|
|
|
|
all_facts.extend(facts)
|
|
print(f" {len(facts)} facts", file=sys.stderr)
|
|
|
|
# Deduplicate by claim text (case-insensitive)
|
|
seen = set()
|
|
unique_facts = []
|
|
for fact in all_facts:
|
|
claim_key = fact.get("claim", "").lower().strip()
|
|
if claim_key and claim_key not in seen:
|
|
seen.add(claim_key)
|
|
unique_facts.append(fact)
|
|
|
|
print(f" Total: {len(unique_facts)} unique facts "
|
|
f"({len(all_facts) - len(unique_facts)} duplicates removed)",
|
|
file=sys.stderr)
|
|
return unique_facts
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Extract atomic facts from conversations")
|
|
parser.add_argument("path", help="JSONL file or directory (with --batch)")
|
|
parser.add_argument("--batch", action="store_true",
|
|
help="Process all .jsonl files in directory")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="Show chunks without calling model")
|
|
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
|
|
parser.add_argument("--min-messages", type=int, default=10,
|
|
help="Skip transcripts with fewer messages (default: 10)")
|
|
args = parser.parse_args()
|
|
|
|
if args.batch:
|
|
jsonl_dir = Path(args.path)
|
|
if not jsonl_dir.is_dir():
|
|
print(f"Not a directory: {args.path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
files = sorted(jsonl_dir.glob("*.jsonl"))
|
|
print(f"Found {len(files)} transcripts", file=sys.stderr)
|
|
else:
|
|
files = [Path(args.path)]
|
|
|
|
all_facts = []
|
|
for f in files:
|
|
# Quick check: skip tiny files
|
|
messages = extract_conversation(str(f))
|
|
if len(messages) < args.min_messages:
|
|
print(f"Skipping {f.name} ({len(messages)} messages < {args.min_messages})",
|
|
file=sys.stderr)
|
|
continue
|
|
|
|
facts = mine_transcript(str(f), dry_run=args.dry_run)
|
|
all_facts.extend(facts)
|
|
|
|
if not args.dry_run:
|
|
output = json.dumps(all_facts, indent=2)
|
|
if args.output:
|
|
Path(args.output).write_text(output)
|
|
print(f"\nWrote {len(all_facts)} facts to {args.output}", file=sys.stderr)
|
|
else:
|
|
print(output)
|
|
|
|
print(f"\nTotal: {len(all_facts)} facts from {len(files)} transcripts",
|
|
file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|