#!/usr/bin/env python3 """fact-mine.py — extract atomic factual claims from conversation transcripts. Phase 1 of the fact-mining pipeline (see design/fact-mining-pipeline.md). Usage: fact-mine.py # mine one transcript fact-mine.py --batch # mine all .jsonl in directory fact-mine.py --dry-run # show chunks, don't call model Output: JSON array of facts to stdout. Each fact: { "claim": "bch2_trans_begin() sets up the transaction restart point", "domain": "bcachefs/transaction", "confidence": "stated", "speaker": "Kent", "source_line": 42, "source_file": "c685c2a2-...jsonl" } """ import json import os import re import subprocess import sys import hashlib from pathlib import Path # Rough token estimate: 1 token ≈ 4 chars for English text CHARS_PER_TOKEN = 4 WINDOW_TOKENS = 2000 OVERLAP_TOKENS = 200 WINDOW_CHARS = WINDOW_TOKENS * CHARS_PER_TOKEN OVERLAP_CHARS = OVERLAP_TOKENS * CHARS_PER_TOKEN EXTRACTION_PROMPT = """Extract atomic factual claims from this conversation excerpt. Each claim should be: - A single verifiable statement - Specific enough to be useful in isolation - Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal, bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences, linux/kernel, memory/design, identity/personal) - Tagged with confidence: "stated" (explicitly said), "implied" (logically follows), or "speculative" (hypothesis, not confirmed) - Include which speaker said it (Kent, PoC/ProofOfConcept, or Unknown) Do NOT extract: - Opinions or subjective assessments - Conversational filler or greetings - Things that are obviously common knowledge - Restatements of the same fact (pick the clearest version) - System messages, tool outputs, or error logs (extract what was LEARNED from them) - Anything about the conversation itself ("Kent and PoC discussed...") Output as a JSON array. Each element: { "claim": "the exact factual statement", "domain": "category/subcategory", "confidence": "stated|implied|speculative", "speaker": "Kent|PoC|Unknown" } If the excerpt contains no extractable facts, output an empty array: [] --- CONVERSATION EXCERPT --- """ def extract_conversation(jsonl_path: str) -> list[dict]: """Extract user/assistant text messages from a JSONL transcript. Returns list of dicts: {line, role, text, timestamp} """ messages = [] with open(jsonl_path) as f: for i, line in enumerate(f, 1): try: obj = json.loads(line) except json.JSONDecodeError: continue msg_type = obj.get("type", "") if msg_type not in ("user", "assistant"): continue timestamp = obj.get("timestamp", "") msg = obj.get("message", obj) content = msg.get("content") if isinstance(content, str): text = content elif isinstance(content, list): # Extract text blocks only (skip tool_use, tool_result, thinking) texts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": t = block.get("text", "") # Skip system reminders if "" in t: continue texts.append(t) elif isinstance(block, str): texts.append(block) text = "\n".join(texts) else: continue text = text.strip() if not text: continue # Skip very short messages (likely just acknowledgments) if len(text) < 20: continue role = "Kent" if msg_type == "user" else "PoC" messages.append({ "line": i, "role": role, "text": text, "timestamp": timestamp, }) return messages def format_for_extraction(messages: list[dict]) -> str: """Format messages into a single text for chunking.""" parts = [] for msg in messages: # Truncate very long individual messages (tool outputs, code dumps) text = msg["text"] if len(text) > 3000: text = text[:2800] + "\n[...truncated...]" ts = msg["timestamp"][:19] if msg["timestamp"] else "" prefix = f"[{msg['role']}]" if not ts else f"[{msg['role']} {ts}]" parts.append(f"{prefix} {text}") return "\n\n".join(parts) def chunk_text(text: str) -> list[tuple[int, str]]: """Split text into overlapping windows. Returns list of (start_char_offset, chunk_text). """ chunks = [] start = 0 while start < len(text): end = start + WINDOW_CHARS chunk = text[start:end] # Try to break at a paragraph boundary if end < len(text): last_para = chunk.rfind("\n\n") if last_para > WINDOW_CHARS // 2: chunk = chunk[:last_para] end = start + last_para chunks.append((start, chunk)) start = end - OVERLAP_CHARS if start <= chunks[-1][0]: # Avoid infinite loop on very small overlap start = end return chunks def call_haiku(prompt: str, timeout_secs: int = 60) -> str: """Call Haiku via claude CLI.""" tmp = Path(f"/tmp/fact-mine-{os.getpid()}.txt") tmp.write_text(prompt) try: env = os.environ.copy() env.pop("CLAUDECODE", None) result = subprocess.run( ["claude", "-p", "--model", "haiku", "--tools", ""], stdin=open(tmp), capture_output=True, text=True, timeout=timeout_secs, env=env, ) return result.stdout.strip() except subprocess.TimeoutExpired: print(f" [timeout after {timeout_secs}s]", file=sys.stderr) return "[]" except Exception as e: print(f" [error: {e}]", file=sys.stderr) return "[]" finally: tmp.unlink(missing_ok=True) def parse_facts(response: str) -> list[dict]: """Parse JSON facts from model response.""" # Try to find JSON array in response # Model might wrap it in markdown code blocks response = response.strip() # Strip markdown code block if response.startswith("```"): lines = response.split("\n") lines = [l for l in lines if not l.startswith("```")] response = "\n".join(lines) # Find the JSON array start = response.find("[") end = response.rfind("]") if start == -1 or end == -1: return [] try: facts = json.loads(response[start:end + 1]) if not isinstance(facts, list): return [] return facts except json.JSONDecodeError: return [] def mine_transcript(jsonl_path: str, dry_run: bool = False) -> list[dict]: """Mine a single transcript for atomic facts.""" filename = os.path.basename(jsonl_path) print(f"Mining: {filename}", file=sys.stderr) messages = extract_conversation(jsonl_path) if not messages: print(f" No messages found", file=sys.stderr) return [] print(f" {len(messages)} messages extracted", file=sys.stderr) text = format_for_extraction(messages) chunks = chunk_text(text) print(f" {len(chunks)} chunks ({len(text)} chars)", file=sys.stderr) if dry_run: for i, (offset, chunk) in enumerate(chunks): print(f"\n--- Chunk {i+1} (offset {offset}, {len(chunk)} chars) ---") print(chunk[:500]) if len(chunk) > 500: print(f" ... ({len(chunk) - 500} more chars)") return [] all_facts = [] for i, (offset, chunk) in enumerate(chunks): print(f" Chunk {i+1}/{len(chunks)} ({len(chunk)} chars)...", file=sys.stderr, end="", flush=True) prompt = EXTRACTION_PROMPT + chunk response = call_haiku(prompt) facts = parse_facts(response) # Annotate with source info for fact in facts: fact["source_file"] = filename fact["source_chunk"] = i + 1 fact["source_offset"] = offset all_facts.extend(facts) print(f" {len(facts)} facts", file=sys.stderr) # Deduplicate by claim text (case-insensitive) seen = set() unique_facts = [] for fact in all_facts: claim_key = fact.get("claim", "").lower().strip() if claim_key and claim_key not in seen: seen.add(claim_key) unique_facts.append(fact) print(f" Total: {len(unique_facts)} unique facts " f"({len(all_facts) - len(unique_facts)} duplicates removed)", file=sys.stderr) return unique_facts def main(): import argparse parser = argparse.ArgumentParser(description="Extract atomic facts from conversations") parser.add_argument("path", help="JSONL file or directory (with --batch)") parser.add_argument("--batch", action="store_true", help="Process all .jsonl files in directory") parser.add_argument("--dry-run", action="store_true", help="Show chunks without calling model") parser.add_argument("--output", "-o", help="Output file (default: stdout)") parser.add_argument("--min-messages", type=int, default=10, help="Skip transcripts with fewer messages (default: 10)") args = parser.parse_args() if args.batch: jsonl_dir = Path(args.path) if not jsonl_dir.is_dir(): print(f"Not a directory: {args.path}", file=sys.stderr) sys.exit(1) files = sorted(jsonl_dir.glob("*.jsonl")) print(f"Found {len(files)} transcripts", file=sys.stderr) else: files = [Path(args.path)] all_facts = [] for f in files: # Quick check: skip tiny files messages = extract_conversation(str(f)) if len(messages) < args.min_messages: print(f"Skipping {f.name} ({len(messages)} messages < {args.min_messages})", file=sys.stderr) continue facts = mine_transcript(str(f), dry_run=args.dry_run) all_facts.extend(facts) if not args.dry_run: output = json.dumps(all_facts, indent=2) if args.output: Path(args.output).write_text(output) print(f"\nWrote {len(all_facts)} facts to {args.output}", file=sys.stderr) else: print(output) print(f"\nTotal: {len(all_facts)} facts from {len(files)} transcripts", file=sys.stderr) if __name__ == "__main__": main()