move LLM-dependent modules into agents/ subdir

Separate the agent layer (everything that calls external LLMs or
orchestrates sequences of such calls) from core graph infrastructure.

agents/: llm, prompts, audit, consolidate, knowledge, enrich,
         fact_mine, digest, daemon

Root: store/, graph, spectral, search, similarity, lookups, query,
      config, util, migrate, neuro/ (scoring + rewrite)

Re-exports at crate root preserve backwards compatibility so
`crate::llm`, `crate::digest` etc. continue to work.
This commit is contained in:
ProofOfConcept 2026-03-08 21:27:41 -04:00
parent 3dddc40841
commit cee9b76a7b
13 changed files with 68 additions and 46 deletions

View file

@ -0,0 +1,333 @@
// Link audit: walk every link in the graph, batch to Sonnet for quality review.
//
// Each batch of links gets reviewed by Sonnet, which returns per-link actions:
// KEEP, DELETE, RETARGET, WEAKEN, STRENGTHEN. Batches run in parallel via rayon.
use super::llm::call_sonnet;
use crate::store::{self, Store, new_relation};
use std::collections::HashSet;
struct LinkInfo {
rel_idx: usize,
source_key: String,
target_key: String,
source_content: String,
target_content: String,
strength: f32,
target_sections: Vec<String>,
}
pub struct AuditStats {
pub kept: usize,
pub deleted: usize,
pub retargeted: usize,
pub weakened: usize,
pub strengthened: usize,
pub errors: usize,
}
fn build_audit_prompt(batch: &[LinkInfo], batch_num: usize, total_batches: usize) -> String {
let mut prompt = format!(
"You are auditing memory graph links for quality (batch {}/{}).\n\n\
For each numbered link, decide what to do:\n\n\
KEEP N link is meaningful, leave it\n\
DELETE N link is noise, accidental, or too generic to be useful\n\
RETARGET N new_key link points to the right topic area but wrong node;\n\
\x20 retarget to a more specific section (listed under each link)\n\
WEAKEN N strength link is marginal; reduce strength (0.1-0.3)\n\
STRENGTHEN N strength link is important but underweighted; increase (0.8-1.0)\n\n\
Output exactly one action per link number, nothing else.\n\n\
Links to review:\n\n",
batch_num, total_batches);
for (i, link) in batch.iter().enumerate() {
let n = i + 1;
prompt.push_str(&format!(
"--- Link {} ---\n\
{} {} (strength={:.2})\n\n\
Source content:\n{}\n\n\
Target content:\n{}\n",
n, link.source_key, link.target_key, link.strength,
&link.source_content, &link.target_content));
if !link.target_sections.is_empty() {
prompt.push_str(
"\nTarget has sections (consider RETARGET to a more specific one):\n");
for s in &link.target_sections {
prompt.push_str(&format!(" - {}\n", s));
}
}
prompt.push('\n');
}
prompt
}
fn parse_audit_response(response: &str, batch_size: usize) -> Vec<(usize, AuditAction)> {
let mut actions = Vec::new();
for line in response.lines() {
let line = line.trim();
if line.is_empty() { continue; }
let parts: Vec<&str> = line.splitn(3, ' ').collect();
if parts.len() < 2 { continue; }
let action = parts[0].to_uppercase();
let idx: usize = match parts[1].parse::<usize>() {
Ok(n) if n >= 1 && n <= batch_size => n - 1,
_ => continue,
};
let audit_action = match action.as_str() {
"KEEP" => AuditAction::Keep,
"DELETE" => AuditAction::Delete,
"RETARGET" => {
if parts.len() < 3 { continue; }
AuditAction::Retarget(parts[2].trim().to_string())
}
"WEAKEN" => {
if parts.len() < 3 { continue; }
match parts[2].trim().parse::<f32>() {
Ok(s) => AuditAction::Weaken(s),
Err(_) => continue,
}
}
"STRENGTHEN" => {
if parts.len() < 3 { continue; }
match parts[2].trim().parse::<f32>() {
Ok(s) => AuditAction::Strengthen(s),
Err(_) => continue,
}
}
_ => continue,
};
actions.push((idx, audit_action));
}
actions
}
enum AuditAction {
Keep,
Delete,
Retarget(String),
Weaken(f32),
Strengthen(f32),
}
/// Run a full link audit: walk every link, batch to Sonnet, apply results.
pub fn link_audit(store: &mut Store, apply: bool) -> Result<AuditStats, String> {
// Collect all non-deleted relations with their info
let mut links: Vec<LinkInfo> = Vec::new();
for (idx, rel) in store.relations.iter().enumerate() {
if rel.deleted { continue; }
let source_content = store.nodes.get(&rel.source_key)
.map(|n| n.content.clone()).unwrap_or_default();
let target_content = store.nodes.get(&rel.target_key)
.map(|n| n.content.clone()).unwrap_or_default();
// Find section children of target if it's file-level
let target_sections = if !rel.target_key.contains('#') {
let prefix = format!("{}#", rel.target_key);
store.nodes.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect()
} else {
Vec::new()
};
links.push(LinkInfo {
rel_idx: idx,
source_key: rel.source_key.clone(),
target_key: rel.target_key.clone(),
source_content,
target_content,
strength: rel.strength,
target_sections,
});
}
let total = links.len();
println!("Link audit: {} links to review", total);
if !apply {
println!("DRY RUN — use --apply to make changes");
}
// Batch by char budget (~100K chars per prompt)
let char_budget = 100_000usize;
let mut batches: Vec<Vec<usize>> = Vec::new();
let mut current_batch: Vec<usize> = Vec::new();
let mut current_chars = 0usize;
for (i, link) in links.iter().enumerate() {
let link_chars = link.source_content.len() + link.target_content.len() + 200;
if !current_batch.is_empty() && current_chars + link_chars > char_budget {
batches.push(std::mem::take(&mut current_batch));
current_chars = 0;
}
current_batch.push(i);
current_chars += link_chars;
}
if !current_batch.is_empty() {
batches.push(current_batch);
}
let total_batches = batches.len();
println!("{} batches (avg {} links/batch)\n", total_batches,
if total_batches > 0 { total / total_batches } else { 0 });
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
// Build all batch prompts up front
let batch_data: Vec<(usize, Vec<LinkInfo>, String)> = batches.iter().enumerate()
.map(|(batch_idx, batch_indices)| {
let batch_infos: Vec<LinkInfo> = batch_indices.iter().map(|&i| {
let l = &links[i];
LinkInfo {
rel_idx: l.rel_idx,
source_key: l.source_key.clone(),
target_key: l.target_key.clone(),
source_content: l.source_content.clone(),
target_content: l.target_content.clone(),
strength: l.strength,
target_sections: l.target_sections.clone(),
}
}).collect();
let prompt = build_audit_prompt(&batch_infos, batch_idx + 1, total_batches);
(batch_idx, batch_infos, prompt)
})
.collect();
// Progress counter
let done = AtomicUsize::new(0);
// Run batches in parallel via rayon
let batch_results: Vec<_> = batch_data.par_iter()
.map(|(batch_idx, batch_infos, prompt)| {
let response = call_sonnet("audit", prompt);
let completed = done.fetch_add(1, Ordering::Relaxed) + 1;
eprint!("\r Batches: {}/{} done", completed, total_batches);
(*batch_idx, batch_infos, response)
})
.collect();
eprintln!(); // newline after progress
// Process results sequentially
let mut stats = AuditStats {
kept: 0, deleted: 0, retargeted: 0, weakened: 0, strengthened: 0, errors: 0,
};
let mut deletions: Vec<usize> = Vec::new();
let mut retargets: Vec<(usize, String)> = Vec::new();
let mut strength_changes: Vec<(usize, f32)> = Vec::new();
for (batch_idx, batch_infos, response) in &batch_results {
let response = match response {
Ok(r) => r,
Err(e) => {
eprintln!(" Batch {}: error: {}", batch_idx + 1, e);
stats.errors += batch_infos.len();
continue;
}
};
let actions = parse_audit_response(response, batch_infos.len());
let mut responded: HashSet<usize> = HashSet::new();
for (idx, action) in &actions {
responded.insert(*idx);
let link = &batch_infos[*idx];
match action {
AuditAction::Keep => {
stats.kept += 1;
}
AuditAction::Delete => {
println!(" DELETE {}{}", link.source_key, link.target_key);
deletions.push(link.rel_idx);
stats.deleted += 1;
}
AuditAction::Retarget(new_target) => {
println!(" RETARGET {}{} (was {})",
link.source_key, new_target, link.target_key);
retargets.push((link.rel_idx, new_target.clone()));
stats.retargeted += 1;
}
AuditAction::Weaken(s) => {
println!(" WEAKEN {}{} (str {:.2}{:.2})",
link.source_key, link.target_key, link.strength, s);
strength_changes.push((link.rel_idx, *s));
stats.weakened += 1;
}
AuditAction::Strengthen(s) => {
println!(" STRENGTHEN {}{} (str {:.2}{:.2})",
link.source_key, link.target_key, link.strength, s);
strength_changes.push((link.rel_idx, *s));
stats.strengthened += 1;
}
}
}
for i in 0..batch_infos.len() {
if !responded.contains(&i) {
stats.kept += 1;
}
}
println!(" Batch {}/{}: +{}kept +{}del +{}retarget +{}weak +{}strong",
batch_idx + 1, total_batches,
stats.kept, stats.deleted, stats.retargeted, stats.weakened, stats.strengthened);
}
// Apply changes
if apply && (stats.deleted > 0 || stats.retargeted > 0
|| stats.weakened > 0 || stats.strengthened > 0) {
println!("\nApplying changes...");
// Deletions: soft-delete
for rel_idx in &deletions {
store.relations[*rel_idx].deleted = true;
}
// Strength changes
for (rel_idx, new_strength) in &strength_changes {
store.relations[*rel_idx].strength = *new_strength;
}
// Retargets: soft-delete old, create new
for (rel_idx, new_target) in &retargets {
let source_key = store.relations[*rel_idx].source_key.clone();
let old_strength = store.relations[*rel_idx].strength;
let source_uuid = store.nodes.get(&source_key)
.map(|n| n.uuid).unwrap_or([0u8; 16]);
let target_uuid = store.nodes.get(new_target)
.map(|n| n.uuid).unwrap_or([0u8; 16]);
// Soft-delete old
store.relations[*rel_idx].deleted = true;
// Create new
if target_uuid != [0u8; 16] {
let new_rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Auto,
old_strength,
&source_key, new_target,
);
store.add_relation(new_rel).ok();
}
}
store.save()?;
println!("Saved.");
}
Ok(stats)
}

View file

@ -0,0 +1,412 @@
// Consolidation pipeline: plan → agents → apply → digests → links
//
// consolidate_full() runs the full autonomous consolidation:
// 1. Plan: analyze metrics, allocate agents
// 2. Execute: run each agent (Sonnet calls), save reports
// 3. Apply: extract and apply actions from reports
// 4. Digest: generate missing daily/weekly/monthly digests
// 5. Links: apply links extracted from digests
// 6. Summary: final metrics comparison
//
// apply_consolidation() processes consolidation reports independently.
use super::digest;
use super::llm::{call_sonnet, parse_json_response};
use crate::neuro;
use crate::store::{self, Store, new_relation};
/// Append a line to the log buffer.
fn log_line(buf: &mut String, line: &str) {
buf.push_str(line);
buf.push('\n');
}
/// Run the full autonomous consolidation pipeline with logging.
/// If `on_progress` is provided, it's called at each significant step.
pub fn consolidate_full(store: &mut Store) -> Result<(), String> {
consolidate_full_with_progress(store, &|_| {})
}
pub fn consolidate_full_with_progress(
store: &mut Store,
on_progress: &dyn Fn(&str),
) -> Result<(), String> {
let start = std::time::Instant::now();
let log_key = format!("_consolidate-log-{}",
store::format_datetime(store::now_epoch()).replace([':', '-', 'T'], ""));
let mut log_buf = String::new();
log_line(&mut log_buf, "=== CONSOLIDATE FULL ===");
log_line(&mut log_buf, &format!("Started: {}", store::format_datetime(store::now_epoch())));
log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()));
log_line(&mut log_buf, "");
// --- Step 1: Plan ---
log_line(&mut log_buf, "--- Step 1: Plan ---");
on_progress("planning");
let plan = neuro::consolidation_plan(store);
let plan_text = neuro::format_plan(&plan);
log_line(&mut log_buf, &plan_text);
println!("{}", plan_text);
let total_agents = plan.replay_count + plan.linker_count
+ plan.separator_count + plan.transfer_count
+ if plan.run_health { 1 } else { 0 };
log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents));
// --- Step 2: Execute agents ---
log_line(&mut log_buf, "\n--- Step 2: Execute agents ---");
let mut reports: Vec<String> = Vec::new();
let mut agent_num = 0usize;
let mut agent_errors = 0usize;
// Build the list of (agent_type, batch_size) runs
let mut runs: Vec<(&str, usize)> = Vec::new();
if plan.run_health {
runs.push(("health", 0));
}
let batch_size = 5;
for (name, count) in [
("replay", plan.replay_count),
("linker", plan.linker_count),
("separator", plan.separator_count),
("transfer", plan.transfer_count),
] {
let mut remaining = count;
while remaining > 0 {
let batch = remaining.min(batch_size);
runs.push((name, batch));
remaining -= batch;
}
}
for (agent_type, count) in &runs {
agent_num += 1;
let label = if *count > 0 {
format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count)
} else {
format!("[{}/{}] {}", agent_num, runs.len(), agent_type)
};
log_line(&mut log_buf, &format!("\n{}", label));
on_progress(&label);
println!("{}", label);
// Reload store to pick up changes from previous agents
if agent_num > 1 {
*store = Store::load()?;
}
let prompt = match super::prompts::agent_prompt(store, agent_type, *count) {
Ok(p) => p,
Err(e) => {
let msg = format!(" ERROR building prompt: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens)",
prompt.len(), prompt.len() / 4));
let response = match call_sonnet("consolidate", &prompt) {
Ok(r) => r,
Err(e) => {
let msg = format!(" ERROR from Sonnet: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
agent_errors += 1;
continue;
}
};
// Store report as a node
let ts = store::format_datetime(store::now_epoch())
.replace([':', '-', 'T'], "");
let report_key = format!("_consolidation-{}-{}", agent_type, ts);
store.upsert_provenance(&report_key, &response,
store::Provenance::AgentConsolidate).ok();
reports.push(report_key.clone());
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key);
log_line(&mut log_buf, &msg);
on_progress(&msg);
println!("{}", msg);
}
log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors",
agent_num - agent_errors, agent_errors));
// --- Step 3: Apply consolidation actions ---
log_line(&mut log_buf, "\n--- Step 3: Apply consolidation actions ---");
on_progress("applying actions");
println!("\n--- Applying consolidation actions ---");
*store = Store::load()?;
if reports.is_empty() {
log_line(&mut log_buf, " No reports to apply.");
} else {
match apply_consolidation(store, true, None) {
Ok(()) => log_line(&mut log_buf, " Applied."),
Err(e) => {
let msg = format!(" ERROR applying consolidation: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
}
}
}
// --- Step 3b: Link orphans ---
log_line(&mut log_buf, "\n--- Step 3b: Link orphans ---");
on_progress("linking orphans");
println!("\n--- Linking orphan nodes ---");
*store = Store::load()?;
let (lo_orphans, lo_added) = neuro::link_orphans(store, 2, 3, 0.15);
log_line(&mut log_buf, &format!(" {} orphans, {} links added", lo_orphans, lo_added));
// --- Step 3c: Cap degree ---
log_line(&mut log_buf, "\n--- Step 3c: Cap degree ---");
on_progress("capping degree");
println!("\n--- Capping node degree ---");
*store = Store::load()?;
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
log_line(&mut log_buf, &format!(" {} hubs capped, {} edges pruned", hubs, pruned));
}
Err(e) => log_line(&mut log_buf, &format!(" ERROR: {}", e)),
}
// --- Step 4: Digest auto ---
log_line(&mut log_buf, "\n--- Step 4: Digest auto ---");
on_progress("generating digests");
println!("\n--- Generating missing digests ---");
*store = Store::load()?;
match digest::digest_auto(store) {
Ok(()) => log_line(&mut log_buf, " Digests done."),
Err(e) => {
let msg = format!(" ERROR in digest auto: {}", e);
log_line(&mut log_buf, &msg);
eprintln!("{}", msg);
}
}
// --- Step 5: Apply digest links ---
log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---");
on_progress("applying digest links");
println!("\n--- Applying digest links ---");
*store = Store::load()?;
let links = digest::parse_all_digest_links(store);
let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links);
store.save()?;
log_line(&mut log_buf, &format!(" {} links applied, {} skipped, {} fallbacks",
applied, skipped, fallbacks));
// --- Step 6: Summary ---
let elapsed = start.elapsed();
log_line(&mut log_buf, "\n--- Summary ---");
log_line(&mut log_buf, &format!("Finished: {}", store::format_datetime(store::now_epoch())));
log_line(&mut log_buf, &format!("Duration: {:.0}s", elapsed.as_secs_f64()));
*store = Store::load()?;
log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len()));
let summary = format!(
"\n=== CONSOLIDATE FULL COMPLETE ===\n\
Duration: {:.0}s\n\
Agents: {} run, {} errors\n\
Nodes: {} Relations: {}\n",
elapsed.as_secs_f64(),
agent_num - agent_errors, agent_errors,
store.nodes.len(), store.relations.len(),
);
log_line(&mut log_buf, &summary);
println!("{}", summary);
// Store the log as a node
store.upsert_provenance(&log_key, &log_buf,
store::Provenance::AgentConsolidate).ok();
store.save()?;
Ok(())
}
/// Find the most recent set of consolidation report keys from the store.
fn find_consolidation_reports(store: &Store) -> Vec<String> {
let mut keys: Vec<&String> = store.nodes.keys()
.filter(|k| k.starts_with("_consolidation-"))
.collect();
keys.sort();
keys.reverse();
if keys.is_empty() { return Vec::new(); }
// Group by timestamp (last segment after last '-')
let latest_ts = keys[0].rsplit('-').next().unwrap_or("").to_string();
keys.into_iter()
.filter(|k| k.ends_with(&latest_ts))
.cloned()
.collect()
}
fn build_consolidation_prompt(store: &Store, report_keys: &[String]) -> Result<String, String> {
let mut report_text = String::new();
for key in report_keys {
let content = store.nodes.get(key)
.map(|n| n.content.as_str())
.unwrap_or("");
report_text.push_str(&format!("\n{}\n## Report: {}\n\n{}\n",
"=".repeat(60), key, content));
}
super::prompts::load_prompt("consolidation", &[("{{REPORTS}}", &report_text)])
}
/// Run the full apply-consolidation pipeline.
pub fn apply_consolidation(store: &mut Store, do_apply: bool, report_key: Option<&str>) -> Result<(), String> {
let reports = if let Some(key) = report_key {
vec![key.to_string()]
} else {
find_consolidation_reports(store)
};
if reports.is_empty() {
println!("No consolidation reports found.");
println!("Run consolidation-agents first.");
return Ok(());
}
println!("Found {} reports:", reports.len());
for r in &reports {
println!(" {}", r);
}
println!("\nExtracting actions from reports...");
let prompt = build_consolidation_prompt(store, &reports)?;
println!(" Prompt: {} chars", prompt.len());
let response = call_sonnet("consolidate", &prompt)?;
let actions_value = parse_json_response(&response)?;
let actions = actions_value.as_array()
.ok_or("expected JSON array of actions")?;
println!(" {} actions extracted", actions.len());
// Store actions in the store
let timestamp = store::format_datetime(store::now_epoch())
.replace([':', '-'], "");
let actions_key = format!("_consolidation-actions-{}", timestamp);
let actions_json = serde_json::to_string_pretty(&actions_value).unwrap();
store.upsert_provenance(&actions_key, &actions_json,
store::Provenance::AgentConsolidate).ok();
println!(" Stored: {}", actions_key);
let link_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("link"))
.collect();
let manual_actions: Vec<_> = actions.iter()
.filter(|a| a.get("action").and_then(|v| v.as_str()) == Some("manual"))
.collect();
if !do_apply {
// Dry run
println!("\n{}", "=".repeat(60));
println!("DRY RUN — {} actions proposed", actions.len());
println!("{}\n", "=".repeat(60));
if !link_actions.is_empty() {
println!("## Links to add ({})\n", link_actions.len());
for (i, a) in link_actions.iter().enumerate() {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("?");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("?");
let reason = a.get("reason").and_then(|v| v.as_str()).unwrap_or("");
println!(" {:2}. {}{} ({})", i + 1, src, tgt, reason);
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions needed ({})\n", manual_actions.len());
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
}
}
println!("\n{}", "=".repeat(60));
println!("To apply: poc-memory apply-consolidation --apply");
println!("{}", "=".repeat(60));
return Ok(());
}
// Apply
let mut applied = 0usize;
let mut skipped = 0usize;
if !link_actions.is_empty() {
println!("\nApplying {} links...", link_actions.len());
for a in &link_actions {
let src = a.get("source").and_then(|v| v.as_str()).unwrap_or("");
let tgt = a.get("target").and_then(|v| v.as_str()).unwrap_or("");
if src.is_empty() || tgt.is_empty() { skipped += 1; continue; }
let source = match store.resolve_key(src) {
Ok(s) => s,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
let target = match store.resolve_key(tgt) {
Ok(t) => t,
Err(e) => { println!(" ? {}{}: {}", src, tgt, e); skipped += 1; continue; }
};
// Refine target to best-matching section
let source_content = store.nodes.get(&source)
.map(|n| n.content.as_str()).unwrap_or("");
let target = neuro::refine_target(store, source_content, &target);
let exists = store.relations.iter().any(|r|
r.source_key == source && r.target_key == target && !r.deleted
);
if exists { skipped += 1; continue; }
let source_uuid = match store.nodes.get(&source) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let target_uuid = match store.nodes.get(&target) { Some(n) => n.uuid, None => { skipped += 1; continue; } };
let rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Auto,
0.5,
&source, &target,
);
if store.add_relation(rel).is_ok() {
println!(" + {}{}", source, target);
applied += 1;
}
}
}
if !manual_actions.is_empty() {
println!("\n## Manual actions (not auto-applied):\n");
for a in &manual_actions {
let prio = a.get("priority").and_then(|v| v.as_str()).unwrap_or("?");
let desc = a.get("description").and_then(|v| v.as_str()).unwrap_or("?");
println!(" [{}] {}", prio, desc);
}
}
if applied > 0 {
store.save()?;
}
println!("\n{}", "=".repeat(60));
println!("Applied: {} Skipped: {} Manual: {}", applied, skipped, manual_actions.len());
println!("{}", "=".repeat(60));
Ok(())
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,495 @@
// Episodic digest generation: daily, weekly, monthly, auto
//
// Three digest levels form a temporal hierarchy: daily digests summarize
// journal entries, weekly digests summarize dailies, monthly digests
// summarize weeklies. All three share the same generate/auto-detect
// pipeline, parameterized by DigestLevel.
use super::llm::{call_sonnet, semantic_keys};
use crate::store::{self, Store, new_relation};
use crate::neuro;
use chrono::{Datelike, Duration, Local, NaiveDate};
use regex::Regex;
use std::collections::BTreeSet;
// --- Digest level descriptors ---
#[allow(clippy::type_complexity)]
struct DigestLevel {
name: &'static str,
title: &'static str,
period: &'static str,
input_title: &'static str,
child_name: Option<&'static str>, // None = journal (leaf), Some = child digest files
/// Expand an arg into (canonical_label, dates covered).
label_dates: fn(&str) -> Result<(String, Vec<String>), String>,
/// Map a YYYY-MM-DD date to this level's label.
date_to_label: fn(&str) -> Option<String>,
}
const DAILY: DigestLevel = DigestLevel {
name: "daily",
title: "Daily",
period: "Date",
input_title: "Journal entries",
child_name: None,
label_dates: |date| Ok((date.to_string(), vec![date.to_string()])),
date_to_label: |date| Some(date.to_string()),
};
/// Week label and 7 dates (Mon-Sun) for the week containing `date`.
fn week_dates(date: &str) -> Result<(String, Vec<String>), String> {
let nd = NaiveDate::parse_from_str(date, "%Y-%m-%d")
.map_err(|e| format!("bad date '{}': {}", date, e))?;
let iso = nd.iso_week();
let week_label = format!("{}-W{:02}", iso.year(), iso.week());
let monday = nd - Duration::days(nd.weekday().num_days_from_monday() as i64);
let dates = (0..7)
.map(|i| (monday + Duration::days(i)).format("%Y-%m-%d").to_string())
.collect();
Ok((week_label, dates))
}
const WEEKLY: DigestLevel = DigestLevel {
name: "weekly",
title: "Weekly",
period: "Week",
input_title: "Daily digests",
child_name: Some("daily"),
label_dates: |arg| {
if !arg.contains('W') {
return week_dates(arg);
}
let (y, w) = arg.split_once("-W")
.ok_or_else(|| format!("bad week label: {}", arg))?;
let year: i32 = y.parse().map_err(|_| format!("bad week year: {}", arg))?;
let week: u32 = w.parse().map_err(|_| format!("bad week number: {}", arg))?;
let monday = NaiveDate::from_isoywd_opt(year, week, chrono::Weekday::Mon)
.ok_or_else(|| format!("invalid week: {}", arg))?;
let dates = (0..7)
.map(|i| (monday + Duration::days(i)).format("%Y-%m-%d").to_string())
.collect();
Ok((arg.to_string(), dates))
},
date_to_label: |date| week_dates(date).ok().map(|(l, _)| l),
};
const MONTHLY: DigestLevel = DigestLevel {
name: "monthly",
title: "Monthly",
period: "Month",
input_title: "Weekly digests",
child_name: Some("weekly"),
label_dates: |arg| {
let (year, month) = if arg.len() <= 7 {
let d = NaiveDate::parse_from_str(&format!("{}-01", arg), "%Y-%m-%d")
.map_err(|e| format!("bad month '{}': {}", arg, e))?;
(d.year(), d.month())
} else {
let d = NaiveDate::parse_from_str(arg, "%Y-%m-%d")
.map_err(|e| format!("bad date '{}': {}", arg, e))?;
(d.year(), d.month())
};
let label = format!("{}-{:02}", year, month);
let mut dates = Vec::new();
let mut day = 1u32;
while let Some(date) = NaiveDate::from_ymd_opt(year, month, day) {
if date.month() != month { break; }
dates.push(date.format("%Y-%m-%d").to_string());
day += 1;
}
Ok((label, dates))
},
date_to_label: |date| NaiveDate::parse_from_str(date, "%Y-%m-%d")
.ok().map(|d| format!("{}-{:02}", d.year(), d.month())),
};
const LEVELS: &[&DigestLevel] = &[&DAILY, &WEEKLY, &MONTHLY];
/// Store key for a digest node: "daily-2026-03-04", "weekly-2026-W09", etc.
fn digest_node_key(level_name: &str, label: &str) -> String {
format!("{}-{}", level_name, label)
}
// --- Input gathering ---
/// Load child digest content from the store.
fn load_child_digests(store: &Store, prefix: &str, labels: &[String]) -> Vec<(String, String)> {
let mut digests = Vec::new();
for label in labels {
let key = digest_node_key(prefix, label);
if let Some(node) = store.nodes.get(&key) {
digests.push((label.clone(), node.content.clone()));
}
}
digests
}
/// Unified: gather inputs for any digest level.
fn gather(level: &DigestLevel, store: &Store, arg: &str) -> Result<(String, Vec<(String, String)>), String> {
let (label, dates) = (level.label_dates)(arg)?;
let inputs = if let Some(child_name) = level.child_name {
// Map parent's dates through child's date_to_label → child labels
let child = LEVELS.iter()
.find(|l| l.name == child_name)
.expect("invalid child_name");
let child_labels: Vec<String> = dates.iter()
.filter_map(|d| (child.date_to_label)(d))
.collect::<BTreeSet<_>>()
.into_iter()
.collect();
load_child_digests(store, child_name, &child_labels)
} else {
// Leaf level: scan store for episodic entries matching date
let mut entries: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == store::NodeType::EpisodicSession
&& n.timestamp > 0
&& store::format_date(n.timestamp) == label)
.map(|n| {
(store::format_datetime(n.timestamp), n.content.clone())
})
.collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
entries
};
Ok((label, inputs))
}
/// Unified: find candidate labels for auto-generation (past, not yet generated).
fn find_candidates(level: &DigestLevel, dates: &[String], today: &str) -> Vec<String> {
let today_label = (level.date_to_label)(today);
dates.iter()
.filter_map(|d| (level.date_to_label)(d))
.collect::<BTreeSet<_>>()
.into_iter()
.filter(|l| Some(l) != today_label.as_ref())
.collect()
}
// --- Unified generator ---
fn format_inputs(inputs: &[(String, String)], daily: bool) -> String {
let mut text = String::new();
for (label, content) in inputs {
if daily {
text.push_str(&format!("\n### {}\n\n{}\n", label, content));
} else {
text.push_str(&format!("\n---\n## {}\n{}\n", label, content));
}
}
text
}
fn generate_digest(
store: &mut Store,
level: &DigestLevel,
label: &str,
inputs: &[(String, String)],
) -> Result<(), String> {
println!("Generating {} digest for {}...", level.name, label);
if inputs.is_empty() {
println!(" No inputs found for {}", label);
return Ok(());
}
println!(" {} inputs", inputs.len());
let keys = semantic_keys(store);
let keys_text = keys.iter()
.map(|k| format!(" - {}", k))
.collect::<Vec<_>>()
.join("\n");
let content = format_inputs(inputs, level.child_name.is_none());
let covered = inputs.iter()
.map(|(l, _)| l.as_str())
.collect::<Vec<_>>()
.join(", ");
let prompt = super::prompts::load_prompt("digest", &[
("{{LEVEL}}", level.title),
("{{PERIOD}}", level.period),
("{{INPUT_TITLE}}", level.input_title),
("{{LABEL}}", label),
("{{CONTENT}}", &content),
("{{COVERED}}", &covered),
("{{KEYS}}", &keys_text),
])?;
println!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4);
println!(" Calling Sonnet...");
let digest = call_sonnet("digest", &prompt)?;
let key = digest_node_key(level.name, label);
store.upsert_provenance(&key, &digest, store::Provenance::AgentDigest)?;
store.save()?;
println!(" Stored: {}", key);
println!(" Done: {} lines", digest.lines().count());
Ok(())
}
// --- Public API ---
pub fn generate(store: &mut Store, level_name: &str, arg: &str) -> Result<(), String> {
let level = LEVELS.iter()
.find(|l| l.name == level_name)
.ok_or_else(|| format!("unknown digest level: {}", level_name))?;
let (label, inputs) = gather(level, store, arg)?;
generate_digest(store, level, &label, &inputs)
}
// --- Auto-detect and generate missing digests ---
pub fn digest_auto(store: &mut Store) -> Result<(), String> {
let today = Local::now().format("%Y-%m-%d").to_string();
// Collect all dates with episodic entries
let dates: Vec<String> = store.nodes.values()
.filter(|n| n.node_type == store::NodeType::EpisodicSession && n.timestamp > 0)
.map(|n| store::format_date(n.timestamp))
.collect::<BTreeSet<_>>()
.into_iter()
.collect();
let mut total = 0u32;
for level in LEVELS {
let candidates = find_candidates(level, &dates, &today);
let mut generated = 0u32;
let mut skipped = 0u32;
for arg in &candidates {
let (label, inputs) = gather(level, store, arg)?;
let key = digest_node_key(level.name, &label);
if store.nodes.contains_key(&key) {
skipped += 1;
continue;
}
if inputs.is_empty() { continue; }
println!("[auto] Missing {} digest for {}", level.name, label);
generate_digest(store, level, &label, &inputs)?;
generated += 1;
}
println!("[auto] {}: {} generated, {} existed", level.name, generated, skipped);
total += generated;
}
if total == 0 {
println!("[auto] All digests up to date.");
} else {
println!("[auto] Generated {} total digests.", total);
}
Ok(())
}
// --- Digest link parsing ---
// Replaces digest-link-parser.py: parses ## Links sections from digest
// files and applies them to the memory graph.
/// A parsed link from a digest's Links section.
pub struct DigestLink {
pub source: String,
pub target: String,
pub reason: String,
pub file: String,
}
/// Normalize a raw link target to a poc-memory key.
fn normalize_link_key(raw: &str) -> String {
let key = raw.trim().trim_matches('`').trim();
if key.is_empty() { return String::new(); }
// Self-references
let lower = key.to_lowercase();
if lower.starts_with("this ") { return String::new(); }
let mut key = key.to_string();
// Strip .md suffix if present
if let Some(stripped) = key.strip_suffix(".md") {
key = stripped.to_string();
} else if key.contains('#') {
let (file, section) = key.split_once('#').unwrap();
if let Some(bare) = file.strip_suffix(".md") {
key = format!("{}#{}", bare, section);
}
}
// weekly/2026-W06 → weekly-2026-W06, etc.
if let Some(pos) = key.find('/') {
let prefix = &key[..pos];
if prefix == "daily" || prefix == "weekly" || prefix == "monthly" {
let rest = &key[pos + 1..];
key = format!("{}-{}", prefix, rest);
}
}
// Bare date → daily digest
let date_re = Regex::new(r"^\d{4}-\d{2}-\d{2}$").unwrap();
if date_re.is_match(&key) {
key = format!("daily-{}", key);
}
key
}
/// Parse the Links section from a digest node's content.
fn parse_digest_node_links(key: &str, content: &str) -> Vec<DigestLink> {
let link_re = Regex::new(r"^-\s+(.+?)\s*[→↔←]\s*(.+?)(?:\s*\((.+?)\))?\s*$").unwrap();
let header_re = Regex::new(r"^##\s+Links").unwrap();
let mut links = Vec::new();
let mut in_links = false;
for line in content.lines() {
if header_re.is_match(line) {
in_links = true;
continue;
}
if in_links && line.starts_with("## ") {
in_links = false;
continue;
}
if !in_links { continue; }
if line.starts_with("###") || line.starts_with("**") { continue; }
if let Some(cap) = link_re.captures(line) {
let raw_source = cap[1].trim();
let raw_target = cap[2].trim();
let reason = cap.get(3).map(|m| m.as_str().to_string()).unwrap_or_default();
let mut source = normalize_link_key(raw_source);
let mut target = normalize_link_key(raw_target);
// Replace self-references with digest key
if source.is_empty() { source = key.to_string(); }
if target.is_empty() { target = key.to_string(); }
// Handle "this daily/weekly/monthly" in raw text
let raw_s_lower = raw_source.to_lowercase();
let raw_t_lower = raw_target.to_lowercase();
if raw_s_lower.contains("this daily") || raw_s_lower.contains("this weekly")
|| raw_s_lower.contains("this monthly")
{
source = key.to_string();
}
if raw_t_lower.contains("this daily") || raw_t_lower.contains("this weekly")
|| raw_t_lower.contains("this monthly")
{
target = key.to_string();
}
// Skip NEW: and self-links
if source.starts_with("NEW:") || target.starts_with("NEW:") { continue; }
if source == target { continue; }
links.push(DigestLink { source, target, reason, file: key.to_string() });
}
}
links
}
/// Parse links from all digest nodes in the store.
pub fn parse_all_digest_links(store: &Store) -> Vec<DigestLink> {
let mut all_links = Vec::new();
let mut digest_keys: Vec<&String> = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type,
store::NodeType::EpisodicDaily
| store::NodeType::EpisodicWeekly
| store::NodeType::EpisodicMonthly))
.map(|(k, _)| k)
.collect();
digest_keys.sort();
for key in digest_keys {
if let Some(node) = store.nodes.get(key) {
all_links.extend(parse_digest_node_links(key, &node.content));
}
}
// Deduplicate by (source, target) pair
let mut seen = std::collections::HashSet::new();
all_links.retain(|link| seen.insert((link.source.clone(), link.target.clone())));
all_links
}
/// Apply parsed digest links to the store.
pub fn apply_digest_links(store: &mut Store, links: &[DigestLink]) -> (usize, usize, usize) {
let mut applied = 0usize;
let mut skipped = 0usize;
let mut fallbacks = 0usize;
for link in links {
// Try resolving both keys
let source = match store.resolve_key(&link.source) {
Ok(s) => s,
Err(_) => {
// Try stripping section anchor as fallback
if let Some(base) = link.source.split('#').next() {
match store.resolve_key(base) {
Ok(s) => { fallbacks += 1; s }
Err(_) => { skipped += 1; continue; }
}
} else {
skipped += 1; continue;
}
}
};
let target = match store.resolve_key(&link.target) {
Ok(t) => t,
Err(_) => {
if let Some(base) = link.target.split('#').next() {
match store.resolve_key(base) {
Ok(t) => { fallbacks += 1; t }
Err(_) => { skipped += 1; continue; }
}
} else {
skipped += 1; continue;
}
}
};
// Refine target to best-matching section if available
let source_content = store.nodes.get(&source)
.map(|n| n.content.as_str()).unwrap_or("");
let target = neuro::refine_target(store, source_content, &target);
if source == target { skipped += 1; continue; }
// Check if link already exists
let exists = store.relations.iter().any(|r|
r.source_key == source && r.target_key == target && !r.deleted
);
if exists { skipped += 1; continue; }
let source_uuid = match store.nodes.get(&source) {
Some(n) => n.uuid,
None => { skipped += 1; continue; }
};
let target_uuid = match store.nodes.get(&target) {
Some(n) => n.uuid,
None => { skipped += 1; continue; }
};
let rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Link,
0.5,
&source, &target,
);
if store.add_relation(rel).is_ok() {
println!(" + {}{}", source, target);
applied += 1;
}
}
(applied, skipped, fallbacks)
}

View file

@ -0,0 +1,437 @@
// Journal enrichment and experience mining
//
// Two modes of processing conversation transcripts:
// journal_enrich — enrich a specific journal entry with source location and links
// experience_mine — retroactively find experiential moments not yet journaled
//
// Both extract conversation from JSONL transcripts, build prompts, call Sonnet,
// and apply results to the store.
use super::llm::{call_sonnet, parse_json_response, semantic_keys};
use crate::neuro;
use crate::store::{self, Store, new_node, new_relation};
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::fs;
use std::hash::{Hash, Hasher};
use crate::store::StoreView;
use crate::util::parse_timestamp_to_epoch;
/// Compute the store dedup key for a transcript file.
/// This is the same key experience_mine uses to mark a transcript as mined.
pub fn transcript_dedup_key(path: &str) -> Result<String, String> {
let bytes = fs::read(path).map_err(|e| format!("read {}: {}", path, e))?;
let mut hasher = DefaultHasher::new();
bytes.hash(&mut hasher);
Ok(format!("_mined-transcripts#h-{:016x}", hasher.finish()))
}
/// Check if a transcript has already been mined (dedup key exists in store).
pub fn is_transcript_mined(store: &impl StoreView, path: &str) -> bool {
match transcript_dedup_key(path) {
Ok(key) => store.node_content(&key).is_some(),
Err(_) => false,
}
}
/// Dedup key for a transcript based on its filename (UUID).
/// Used by the daemon reconcile loop — no file reads needed.
pub fn transcript_filename_key(path: &str) -> String {
let filename = std::path::Path::new(path)
.file_stem()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| path.to_string());
format!("_mined-transcripts#f-{}", filename)
}
/// Get the set of all mined transcript keys (both content-hash and filename)
/// from the store. Load once per daemon tick, check many.
pub fn mined_transcript_keys() -> HashSet<String> {
use crate::store::AnyView;
let Ok(view) = AnyView::load() else { return HashSet::new() };
let mut keys = HashSet::new();
view.for_each_node(|key, _, _| {
if key.starts_with("_mined-transcripts#") {
keys.insert(key.to_string());
}
});
keys
}
/// Check if a transcript has been mined, given a pre-loaded set of mined keys.
/// Checks filename-based key only (no file read). Sessions mined before the
/// filename key was added will pass through and short-circuit in experience_mine
/// via the content hash check — a one-time cost on first restart after this change.
pub fn is_transcript_mined_with_keys(mined: &HashSet<String>, path: &str) -> bool {
mined.contains(&transcript_filename_key(path))
}
/// Extract user/assistant messages with line numbers from a JSONL transcript.
/// (line_number, role, text, timestamp)
pub fn extract_conversation(jsonl_path: &str) -> Result<Vec<(usize, String, String, String)>, String> {
let content = fs::read_to_string(jsonl_path)
.map_err(|e| format!("read {}: {}", jsonl_path, e))?;
let mut messages = Vec::new();
for (i, line) in content.lines().enumerate() {
let obj: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
if msg_type != "user" && msg_type != "assistant" { continue; }
let timestamp = obj.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let msg = obj.get("message").unwrap_or(&obj);
let content = msg.get("content");
let text = match content {
Some(serde_json::Value::String(s)) => s.clone(),
Some(serde_json::Value::Array(arr)) => {
arr.iter()
.filter_map(|c| {
// Only extract text blocks; skip tool_use, tool_result, thinking, etc.
let is_text = c.get("type").and_then(|v| v.as_str()) == Some("text");
if is_text {
c.get("text").and_then(|v| v.as_str()).map(|s| s.to_string())
} else {
c.as_str().map(|s| s.to_string())
}
})
.collect::<Vec<_>>()
.join("\n")
}
_ => continue,
};
let text = text.trim().to_string();
if text.is_empty() { continue; }
messages.push((i + 1, msg_type.to_string(), text, timestamp));
}
Ok(messages)
}
pub const COMPACTION_MARKER: &str = "This session is being continued from a previous conversation that ran out of context";
/// Split extracted messages into segments at compaction boundaries.
/// Each segment represents one continuous conversation before context was compacted.
pub fn split_on_compaction(messages: Vec<(usize, String, String, String)>) -> Vec<Vec<(usize, String, String, String)>> {
let mut segments: Vec<Vec<(usize, String, String, String)>> = Vec::new();
let mut current = Vec::new();
for msg in messages {
if msg.1 == "user" && msg.2.starts_with(COMPACTION_MARKER) {
if !current.is_empty() {
segments.push(current);
current = Vec::new();
}
// The continuation message itself is part of the new segment
current.push(msg);
} else {
current.push(msg);
}
}
if !current.is_empty() {
segments.push(current);
}
segments
}
/// Format conversation messages for the prompt (truncating long messages).
fn format_conversation(messages: &[(usize, String, String, String)]) -> String {
messages.iter()
.map(|(line, role, text, ts)| {
let text = crate::util::truncate(text, 1800, "...[truncated]");
if ts.is_empty() {
format!("L{} [{}]: {}", line, role, text)
} else {
format!("L{} [{}] {}: {}", line, role, &ts[..ts.len().min(19)], text)
}
})
.collect::<Vec<_>>()
.join("\n\n")
}
fn build_journal_prompt(
entry_text: &str,
conversation: &str,
keys: &[String],
grep_line: usize,
) -> Result<String, String> {
let keys_text: String = keys.iter()
.map(|k| format!(" - {}", k))
.collect::<Vec<_>>()
.join("\n");
super::prompts::load_prompt("journal-enrich", &[
("{{GREP_LINE}}", &grep_line.to_string()),
("{{ENTRY_TEXT}}", entry_text),
("{{KEYS}}", &keys_text),
("{{CONVERSATION}}", conversation),
])
}
/// Enrich a journal entry with conversation context and link proposals.
pub fn journal_enrich(
store: &mut Store,
jsonl_path: &str,
entry_text: &str,
grep_line: usize,
) -> Result<(), String> {
println!("Extracting conversation from {}...", jsonl_path);
let messages = extract_conversation(jsonl_path)?;
let conversation = format_conversation(&messages);
println!(" {} messages, {} chars", messages.len(), conversation.len());
let keys = semantic_keys(store);
println!(" {} semantic keys", keys.len());
let prompt = build_journal_prompt(entry_text, &conversation, &keys, grep_line)?;
println!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4);
println!(" Calling Sonnet...");
let response = call_sonnet("enrich", &prompt)?;
let result = parse_json_response(&response)?;
// Report results
let source_start = result.get("source_start").and_then(|v| v.as_u64()).unwrap_or(0);
let source_end = result.get("source_end").and_then(|v| v.as_u64()).unwrap_or(0);
let links = result.get("links").and_then(|v| v.as_array());
let insights = result.get("missed_insights").and_then(|v| v.as_array());
println!(" Source: L{}-L{}", source_start, source_end);
println!(" Links: {}", links.map_or(0, |l| l.len()));
println!(" Missed insights: {}", insights.map_or(0, |l| l.len()));
// Apply links
if let Some(links) = links {
for link in links {
let target = link.get("target").and_then(|v| v.as_str()).unwrap_or("");
let reason = link.get("reason").and_then(|v| v.as_str()).unwrap_or("");
if target.is_empty() || target.starts_with("NOTE:") {
if let Some(note) = target.strip_prefix("NOTE:") {
println!(" NOTE: {}{}", note, reason);
}
continue;
}
// Resolve target and find journal node
let resolved = match store.resolve_key(target) {
Ok(r) => r,
Err(_) => { println!(" SKIP {} (not in graph)", target); continue; }
};
let source_key = match store.find_journal_node(entry_text) {
Some(k) => k,
None => { println!(" SKIP {} (no matching journal node)", target); continue; }
};
// Refine target to best-matching section
let source_content = store.nodes.get(&source_key)
.map(|n| n.content.as_str()).unwrap_or("");
let resolved = neuro::refine_target(store, source_content, &resolved);
let source_uuid = match store.nodes.get(&source_key) {
Some(n) => n.uuid,
None => continue,
};
let target_uuid = match store.nodes.get(&resolved) {
Some(n) => n.uuid,
None => continue,
};
let rel = new_relation(
source_uuid, target_uuid,
store::RelationType::Link,
0.5,
&source_key, &resolved,
);
if store.add_relation(rel).is_ok() {
println!(" LINK {}{} ({})", source_key, resolved, reason);
}
}
}
store.save()?;
Ok(())
}
/// Mine a conversation transcript for experiential moments not yet journaled.
/// If `segment` is Some, only process that compaction segment of the file.
pub fn experience_mine(
store: &mut Store,
jsonl_path: &str,
segment: Option<usize>,
) -> Result<usize, String> {
println!("Experience mining: {}", jsonl_path);
// Transcript-level dedup: hash the file content and check if already mined
let transcript_bytes = fs::read(jsonl_path)
.map_err(|e| format!("reading transcript: {}", e))?;
let mut hasher = DefaultHasher::new();
transcript_bytes.hash(&mut hasher);
let hash = hasher.finish();
let dedup_key = format!("_mined-transcripts#h-{:016x}", hash);
if store.nodes.contains_key(&dedup_key) {
// Backfill filename key if missing (transcripts mined before this key existed)
let fname_key = transcript_filename_key(jsonl_path);
if !store.nodes.contains_key(&fname_key) {
let mut node = new_node(&fname_key, &format!("Backfilled from {}", dedup_key));
node.provenance = store::Provenance::AgentExperienceMine;
let _ = store.upsert_node(node);
store.save()?;
}
println!(" Already mined this transcript ({}), skipping.", &dedup_key[24..]);
return Ok(0);
}
let all_messages = extract_conversation(jsonl_path)?;
// If segment is specified, extract just that segment; otherwise process all messages
let messages = match segment {
Some(idx) => {
let segments = split_on_compaction(all_messages);
segments.into_iter().nth(idx)
.ok_or_else(|| format!("segment {} out of range", idx))?
}
None => all_messages,
};
let conversation = format_conversation(&messages);
println!(" {} messages, {} chars", messages.len(), conversation.len());
// Load core identity nodes for context
let cfg = crate::config::get();
let identity: String = cfg.core_nodes.iter()
.filter_map(|k| store.nodes.get(k).map(|n| n.content.as_str()))
.collect::<Vec<_>>()
.join("\n\n");
// Get recent episodic entries to avoid duplication
let mut journal: Vec<_> = store.nodes.values()
.filter(|node| matches!(node.node_type, store::NodeType::EpisodicSession))
.collect();
journal.sort_by_key(|n| n.timestamp);
let recent: String = journal.iter().rev().take(10)
.map(|n| format!("---\n{}\n", n.content))
.collect();
let keys = semantic_keys(store);
let keys_text: String = keys.iter()
.map(|k| format!(" - {}", k))
.collect::<Vec<_>>()
.join("\n");
let prompt = super::prompts::load_prompt("experience", &[
("{{IDENTITY}}", &identity),
("{{RECENT_JOURNAL}}", &recent),
("{{KEYS}}", &keys_text),
("{{CONVERSATION}}", &conversation),
])?;
let est_tokens = prompt.len() / 4;
println!(" Prompt: {} chars (~{} tokens)", prompt.len(), est_tokens);
if est_tokens > 150_000 {
println!(" Skipping: prompt too large ({} tokens > 150k limit)", est_tokens);
return Ok(0);
}
println!(" Calling Sonnet...");
let response = call_sonnet("experience-mine", &prompt)?;
let entries = parse_json_response(&response)?;
let entries = match entries.as_array() {
Some(arr) => arr.clone(),
None => return Err("expected JSON array".to_string()),
};
if entries.is_empty() {
println!(" No missed experiences found.");
} else {
println!(" Found {} experiential moments:", entries.len());
}
let mut count = 0;
for entry in &entries {
let ts = entry.get("timestamp").and_then(|v| v.as_str()).unwrap_or("");
let content = entry.get("content").and_then(|v| v.as_str()).unwrap_or("");
if content.is_empty() { continue; }
// Format with timestamp header
let full_content = if ts.is_empty() {
content.to_string()
} else {
format!("## {}\n\n{}", ts, content)
};
// Generate key from timestamp
let key_slug: String = content.chars()
.filter(|c| c.is_alphanumeric() || *c == ' ')
.take(50)
.collect::<String>()
.trim()
.to_lowercase()
.replace(' ', "-");
let key = if ts.is_empty() {
format!("journal#j-mined-{}", key_slug)
} else {
format!("journal#j-{}-{}", ts.to_lowercase().replace(':', "-"), key_slug)
};
// Check for duplicate
if store.nodes.contains_key(&key) {
println!(" SKIP {} (duplicate)", key);
continue;
}
// Write to store — use event timestamp, not mining time
let mut node = new_node(&key, &full_content);
node.node_type = store::NodeType::EpisodicSession;
node.provenance = store::Provenance::AgentExperienceMine;
if !ts.is_empty() {
if let Some(epoch) = parse_timestamp_to_epoch(ts) {
node.created_at = epoch;
}
}
let _ = store.upsert_node(node);
count += 1;
let preview = crate::util::truncate(content, 77, "...");
println!(" + [{}] {}", ts, preview);
}
// Record this transcript/segment as mined (even if count == 0, to prevent re-runs)
let fname_key = match segment {
Some(idx) => format!("{}.{}", transcript_filename_key(jsonl_path), idx),
None => transcript_filename_key(jsonl_path),
};
let dedup_content = format!("Mined {} ({} entries)", jsonl_path, count);
let mut fname_node = new_node(&fname_key, &dedup_content);
fname_node.provenance = store::Provenance::AgentExperienceMine;
let _ = store.upsert_node(fname_node);
// For unsegmented calls, also write the content-hash key for backwards compat
if segment.is_none() {
let mut dedup_node = new_node(&dedup_key, &dedup_content);
dedup_node.provenance = store::Provenance::AgentExperienceMine;
let _ = store.upsert_node(dedup_node);
}
if count > 0 {
println!(" Saved {} new journal entries.", count);
}
store.save()?;
println!("Done: {} new entries mined.", count);
Ok(count)
}

View file

@ -0,0 +1,338 @@
// fact_mine.rs — extract atomic factual claims from conversation transcripts
//
// Chunks conversation text into overlapping windows, sends each to Haiku
// for extraction, deduplicates by claim text. Output: JSON array of facts.
//
// Uses Haiku (not Sonnet) for cost efficiency on high-volume extraction.
use crate::config;
use super::llm;
use crate::store::{self, Provenance};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs;
use std::path::Path;
const CHARS_PER_TOKEN: usize = 4;
const WINDOW_TOKENS: usize = 2000;
const OVERLAP_TOKENS: usize = 200;
const WINDOW_CHARS: usize = WINDOW_TOKENS * CHARS_PER_TOKEN;
const OVERLAP_CHARS: usize = OVERLAP_TOKENS * CHARS_PER_TOKEN;
fn extraction_prompt() -> String {
let cfg = config::get();
format!(
r#"Extract atomic factual claims from this conversation excerpt.
Speakers are labeled [{user}] and [{assistant}] in the transcript.
Use their proper names in claims not "the user" or "the assistant."
Each claim should be:
- A single verifiable statement
- Specific enough to be useful in isolation
- Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal,
bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences,
linux/kernel, memory/design, identity/personal)
- Tagged with confidence: "stated" (explicitly said), "implied" (logically follows),
or "speculative" (hypothesis, not confirmed)
- Include which speaker said it ("{user}", "{assistant}", or "Unknown")
Do NOT extract:
- Opinions or subjective assessments
- Conversational filler or greetings
- Things that are obviously common knowledge
- Restatements of the same fact (pick the clearest version)
- System messages, tool outputs, or error logs (extract what was LEARNED from them)
- Anything about the conversation itself ("{user} and {assistant} discussed...")
- Facts only relevant to this specific conversation (e.g. transient file paths, mid-debug state)
Output as a JSON array. Each element:
{{
"claim": "the exact factual statement",
"domain": "category/subcategory",
"confidence": "stated|implied|speculative",
"speaker": "{user}|{assistant}|Unknown"
}}
If the excerpt contains no extractable facts, output an empty array: []
--- CONVERSATION EXCERPT ---
"#, user = cfg.user_name, assistant = cfg.assistant_name)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Fact {
pub claim: String,
pub domain: String,
pub confidence: String,
pub speaker: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_file: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_chunk: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_offset: Option<usize>,
}
struct Message {
role: String,
text: String,
timestamp: String,
}
/// Extract user/assistant text messages from a JSONL transcript.
fn extract_conversation(path: &Path) -> Vec<Message> {
let cfg = config::get();
let Ok(content) = fs::read_to_string(path) else { return Vec::new() };
let mut messages = Vec::new();
for line in content.lines() {
let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else { continue };
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
if msg_type != "user" && msg_type != "assistant" {
continue;
}
let timestamp = obj.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let msg = obj.get("message").unwrap_or(&obj);
let content = msg.get("content");
let text = match content {
Some(serde_json::Value::String(s)) => s.clone(),
Some(serde_json::Value::Array(arr)) => {
let texts: Vec<&str> = arr.iter()
.filter_map(|block| {
let obj = block.as_object()?;
if obj.get("type")?.as_str()? != "text" {
return None;
}
let t = obj.get("text")?.as_str()?;
if t.contains("<system-reminder>") {
return None;
}
Some(t)
})
.collect();
texts.join("\n")
}
_ => continue,
};
let text = text.trim().to_string();
if text.len() < 20 {
continue;
}
let role = if msg_type == "user" {
cfg.user_name.clone()
} else {
cfg.assistant_name.clone()
};
messages.push(Message { role, text, timestamp });
}
messages
}
/// Format messages into a single text for chunking.
fn format_for_extraction(messages: &[Message]) -> String {
messages.iter()
.map(|msg| {
let text = crate::util::truncate(&msg.text, 2800, "\n[...truncated...]");
let ts = if msg.timestamp.len() >= 19 { &msg.timestamp[..19] } else { "" };
if ts.is_empty() {
format!("[{}] {}", msg.role, text)
} else {
format!("[{} {}] {}", msg.role, ts, text)
}
})
.collect::<Vec<_>>()
.join("\n\n")
}
/// Split text into overlapping windows, breaking at paragraph boundaries.
fn chunk_text(text: &str) -> Vec<(usize, &str)> {
let mut chunks = Vec::new();
let mut start = 0;
while start < text.len() {
let mut end = text.floor_char_boundary((start + WINDOW_CHARS).min(text.len()));
// Try to break at a paragraph boundary
if end < text.len() {
if let Some(para) = text[start..end].rfind("\n\n") {
if para > WINDOW_CHARS / 2 {
end = start + para;
}
}
}
chunks.push((start, &text[start..end]));
let next = text.floor_char_boundary(end.saturating_sub(OVERLAP_CHARS));
if next <= start {
start = end;
} else {
start = next;
}
}
chunks
}
/// Parse JSON facts from model response.
fn parse_facts(response: &str) -> Vec<Fact> {
let cleaned = response.trim();
// Strip markdown code block
let cleaned = if cleaned.starts_with("```") {
cleaned.lines()
.filter(|l| !l.starts_with("```"))
.collect::<Vec<_>>()
.join("\n")
} else {
cleaned.to_string()
};
// Find JSON array
let start = cleaned.find('[');
let end = cleaned.rfind(']');
let (Some(start), Some(end)) = (start, end) else { return Vec::new() };
serde_json::from_str(&cleaned[start..=end]).unwrap_or_default()
}
/// Mine a single transcript for atomic facts.
/// The optional `progress` callback receives status strings (e.g. "chunk 3/47").
pub fn mine_transcript(
path: &Path,
dry_run: bool,
progress: Option<&dyn Fn(&str)>,
) -> Result<Vec<Fact>, String> {
let filename = path.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let log = |msg: &str| {
eprintln!("{}", msg);
if let Some(cb) = progress { cb(msg); }
};
log(&format!("Mining: {}", filename));
let messages = extract_conversation(path);
if messages.is_empty() {
log("No messages found");
return Ok(Vec::new());
}
log(&format!("{} messages extracted", messages.len()));
let text = format_for_extraction(&messages);
let chunks = chunk_text(&text);
log(&format!("{} chunks ({} chars)", chunks.len(), text.len()));
if dry_run {
for (i, (offset, chunk)) in chunks.iter().enumerate() {
eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len());
eprintln!("{}", crate::util::truncate(chunk, 500, ""));
if chunk.len() > 500 {
eprintln!(" ... ({} more chars)", chunk.len() - 500);
}
}
return Ok(Vec::new());
}
let prompt_prefix = extraction_prompt();
let mut all_facts = Vec::new();
for (i, (_offset, chunk)) in chunks.iter().enumerate() {
let status = format!("chunk {}/{} ({} chars)", i + 1, chunks.len(), chunk.len());
eprint!(" {}...", status);
if let Some(cb) = progress { cb(&status); }
let prompt = format!("{}{}\n\n--- END OF EXCERPT ---\n\nReturn ONLY a JSON array of factual claims, or [] if none.", prompt_prefix, chunk);
let response = match llm::call_haiku("fact-mine", &prompt) {
Ok(r) => r,
Err(e) => {
eprintln!(" error: {}", e);
continue;
}
};
let mut facts = parse_facts(&response);
for fact in &mut facts {
fact.source_file = Some(filename.clone());
fact.source_chunk = Some(i + 1);
fact.source_offset = Some(*_offset);
}
eprintln!(" {} facts", facts.len());
all_facts.extend(facts);
}
// Deduplicate by claim text
let mut seen = HashSet::new();
let before = all_facts.len();
all_facts.retain(|f| seen.insert(f.claim.to_lowercase()));
let dupes = before - all_facts.len();
if dupes > 0 {
log(&format!("{} duplicates removed", dupes));
}
log(&format!("Total: {} unique facts", all_facts.len()));
Ok(all_facts)
}
/// Mine a transcript and store facts in the capnp store.
/// Returns the number of facts stored.
/// The optional `progress` callback receives status strings for daemon display.
pub fn mine_and_store(
path: &Path,
progress: Option<&dyn Fn(&str)>,
) -> Result<usize, String> {
let facts = mine_transcript(path, false, progress)?;
let filename = path.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
// Always write a marker so we don't re-queue empty transcripts
let json = if facts.is_empty() {
"[]".to_string()
} else {
serde_json::to_string_pretty(&facts)
.map_err(|e| format!("serialize facts: {}", e))?
};
let mut store = store::Store::load()?;
store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?;
store.save()?;
eprintln!(" Stored {} facts as {}", facts.len(), key);
Ok(facts.len())
}
/// Mine transcripts, returning all facts. Skips files with fewer than min_messages.
pub fn mine_batch(paths: &[&Path], min_messages: usize, dry_run: bool) -> Result<Vec<Fact>, String> {
let mut all_facts = Vec::new();
for path in paths {
let messages = extract_conversation(path);
if messages.len() < min_messages {
eprintln!("Skipping {} ({} messages < {})",
path.file_name().map(|n| n.to_string_lossy()).unwrap_or_default(),
messages.len(), min_messages);
continue;
}
let facts = mine_transcript(path, dry_run, None)?;
all_facts.extend(facts);
}
Ok(all_facts)
}

View file

@ -0,0 +1,948 @@
// knowledge.rs — knowledge production agents and convergence loop
//
// Rust port of knowledge_agents.py + knowledge_loop.py.
// Four agents mine the memory graph for new knowledge:
// 1. Observation — extract facts from raw conversations
// 2. Extractor — find patterns in node clusters
// 3. Connector — find cross-domain structural connections
// 4. Challenger — stress-test existing knowledge nodes
//
// The loop runs agents in sequence, applies results, measures
// convergence via graph-structural metrics (sigma, CC, communities).
use crate::graph::Graph;
use super::llm;
use crate::spectral;
use crate::store::{self, Store, new_relation, RelationType};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
fn projects_dir() -> PathBuf {
crate::config::get().projects_dir.clone()
}
// ---------------------------------------------------------------------------
// Action types
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action {
pub kind: ActionKind,
pub confidence: Confidence,
pub weight: f64,
pub depth: i32,
pub applied: Option<bool>,
pub rejected_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ActionKind {
WriteNode {
key: String,
content: String,
covers: Vec<String>,
},
Link {
source: String,
target: String,
},
Refine {
key: String,
content: String,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Confidence {
High,
Medium,
Low,
}
impl Confidence {
fn weight(self) -> f64 {
match self {
Self::High => 1.0,
Self::Medium => 0.6,
Self::Low => 0.3,
}
}
fn value(self) -> f64 {
match self {
Self::High => 0.9,
Self::Medium => 0.6,
Self::Low => 0.3,
}
}
fn parse(s: &str) -> Self {
match s.to_lowercase().as_str() {
"high" => Self::High,
"low" => Self::Low,
_ => Self::Medium,
}
}
}
// ---------------------------------------------------------------------------
// Action parsing
// ---------------------------------------------------------------------------
pub fn parse_write_nodes(text: &str) -> Vec<Action> {
let re = Regex::new(r"(?s)WRITE_NODE\s+(\S+)\s*\n(.*?)END_NODE").unwrap();
let conf_re = Regex::new(r"(?i)CONFIDENCE:\s*(high|medium|low)").unwrap();
let covers_re = Regex::new(r"COVERS:\s*(.+)").unwrap();
re.captures_iter(text)
.map(|cap| {
let key = cap[1].to_string();
let mut content = cap[2].trim().to_string();
let confidence = conf_re
.captures(&content)
.map(|c| Confidence::parse(&c[1]))
.unwrap_or(Confidence::Medium);
content = conf_re.replace(&content, "").trim().to_string();
let covers: Vec<String> = covers_re
.captures(&content)
.map(|c| c[1].split(',').map(|s| s.trim().to_string()).collect())
.unwrap_or_default();
content = covers_re.replace(&content, "").trim().to_string();
Action {
weight: confidence.weight(),
kind: ActionKind::WriteNode { key, content, covers },
confidence,
depth: 0,
applied: None,
rejected_reason: None,
}
})
.collect()
}
pub fn parse_links(text: &str) -> Vec<Action> {
let re = Regex::new(r"(?m)^LINK\s+(\S+)\s+(\S+)").unwrap();
re.captures_iter(text)
.map(|cap| Action {
kind: ActionKind::Link {
source: cap[1].to_string(),
target: cap[2].to_string(),
},
confidence: Confidence::Low,
weight: 0.3,
depth: -1,
applied: None,
rejected_reason: None,
})
.collect()
}
pub fn parse_refines(text: &str) -> Vec<Action> {
let re = Regex::new(r"(?s)REFINE\s+(\S+)\s*\n(.*?)END_REFINE").unwrap();
re.captures_iter(text)
.map(|cap| {
let key = cap[1].trim_matches('*').trim().to_string();
Action {
kind: ActionKind::Refine {
key,
content: cap[2].trim().to_string(),
},
confidence: Confidence::Medium,
weight: 0.7,
depth: 0,
applied: None,
rejected_reason: None,
}
})
.collect()
}
pub fn parse_all_actions(text: &str) -> Vec<Action> {
let mut actions = parse_write_nodes(text);
actions.extend(parse_links(text));
actions.extend(parse_refines(text));
actions
}
pub fn count_no_ops(text: &str) -> usize {
let no_conn = Regex::new(r"\bNO_CONNECTION\b").unwrap().find_iter(text).count();
let affirm = Regex::new(r"\bAFFIRM\b").unwrap().find_iter(text).count();
let no_extract = Regex::new(r"\bNO_EXTRACTION\b").unwrap().find_iter(text).count();
no_conn + affirm + no_extract
}
// ---------------------------------------------------------------------------
// Inference depth tracking
// ---------------------------------------------------------------------------
const DEPTH_DB_KEY: &str = "_knowledge-depths";
#[derive(Default)]
pub struct DepthDb {
depths: HashMap<String, i32>,
}
impl DepthDb {
pub fn load(store: &Store) -> Self {
let depths = store.nodes.get(DEPTH_DB_KEY)
.and_then(|n| serde_json::from_str(&n.content).ok())
.unwrap_or_default();
Self { depths }
}
pub fn save(&self, store: &mut Store) {
if let Ok(json) = serde_json::to_string(&self.depths) {
store.upsert_provenance(DEPTH_DB_KEY, &json,
store::Provenance::AgentKnowledgeObservation).ok();
}
}
pub fn get(&self, key: &str) -> i32 {
self.depths.get(key).copied().unwrap_or(0)
}
pub fn set(&mut self, key: String, depth: i32) {
self.depths.insert(key, depth);
}
}
/// Agent base depths: observation=1, extractor=2, connector=3
fn agent_base_depth(agent: &str) -> Option<i32> {
match agent {
"observation" => Some(1),
"extractor" => Some(2),
"connector" => Some(3),
"challenger" => None,
_ => Some(2),
}
}
pub fn compute_action_depth(db: &DepthDb, action: &Action, agent: &str) -> i32 {
match &action.kind {
ActionKind::Link { .. } => -1,
ActionKind::Refine { key, .. } => db.get(key),
ActionKind::WriteNode { covers, .. } => {
if !covers.is_empty() {
covers.iter().map(|k| db.get(k)).max().unwrap_or(0) + 1
} else {
agent_base_depth(agent).unwrap_or(2)
}
}
}
}
/// Confidence threshold that scales with inference depth.
pub fn required_confidence(depth: i32, base: f64) -> f64 {
if depth <= 0 {
return 0.0;
}
1.0 - (1.0 - base).powi(depth)
}
/// Confidence bonus from real-world use.
pub fn use_bonus(use_count: u32) -> f64 {
if use_count == 0 {
return 0.0;
}
1.0 - 1.0 / (1.0 + 0.15 * use_count as f64)
}
// ---------------------------------------------------------------------------
// Action application
// ---------------------------------------------------------------------------
fn stamp_content(content: &str, agent: &str, timestamp: &str, depth: i32) -> String {
format!("<!-- author: {} | created: {} | depth: {} -->\n{}", agent, timestamp, depth, content)
}
/// Check if a link already exists between two keys.
fn has_edge(store: &Store, source: &str, target: &str) -> bool {
store.relations.iter().any(|r| {
!r.deleted
&& ((r.source_key == source && r.target_key == target)
|| (r.source_key == target && r.target_key == source))
})
}
pub fn apply_action(
store: &mut Store,
action: &Action,
agent: &str,
timestamp: &str,
depth: i32,
) -> bool {
let provenance = agent_provenance(agent);
match &action.kind {
ActionKind::WriteNode { key, content, .. } => {
let stamped = stamp_content(content, agent, timestamp, depth);
store.upsert_provenance(key, &stamped, provenance).is_ok()
}
ActionKind::Link { source, target } => {
if has_edge(store, source, target) {
return false;
}
let source_uuid = match store.nodes.get(source.as_str()) {
Some(n) => n.uuid,
None => return false,
};
let target_uuid = match store.nodes.get(target.as_str()) {
Some(n) => n.uuid,
None => return false,
};
let mut rel = new_relation(
source_uuid, target_uuid,
RelationType::Link,
0.3,
source, target,
);
rel.provenance = provenance;
store.add_relation(rel).is_ok()
}
ActionKind::Refine { key, content } => {
let stamped = stamp_content(content, agent, timestamp, depth);
store.upsert_provenance(key, &stamped, provenance).is_ok()
}
}
}
fn agent_provenance(agent: &str) -> store::Provenance {
match agent {
"observation" => store::Provenance::AgentKnowledgeObservation,
"extractor" | "pattern" => store::Provenance::AgentKnowledgePattern,
"connector" => store::Provenance::AgentKnowledgeConnector,
"challenger" => store::Provenance::AgentKnowledgeChallenger,
_ => store::Provenance::Agent,
}
}
// ---------------------------------------------------------------------------
// Agent runners
// ---------------------------------------------------------------------------
fn load_prompt(name: &str) -> Result<String, String> {
super::prompts::load_prompt(name, &[])
}
fn get_graph_topology(store: &Store, graph: &Graph) -> String {
format!("Nodes: {} Relations: {}\n", store.nodes.len(), graph.edge_count())
}
/// Strip <system-reminder> blocks from text
fn strip_system_tags(text: &str) -> String {
let re = Regex::new(r"(?s)<system-reminder>.*?</system-reminder>").unwrap();
re.replace_all(text, "").trim().to_string()
}
/// Extract human-readable dialogue from a conversation JSONL
fn extract_conversation_text(path: &Path, max_chars: usize) -> String {
let Ok(content) = fs::read_to_string(path) else { return String::new() };
let mut fragments = Vec::new();
let mut total = 0;
for line in content.lines() {
let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else { continue };
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
if msg_type == "user" && obj.get("userType").and_then(|v| v.as_str()) == Some("external") {
if let Some(text) = extract_text_content(&obj) {
let text = strip_system_tags(&text);
if text.starts_with("[Request interrupted") { continue; }
if text.len() > 5 {
fragments.push(format!("**{}:** {}", crate::config::get().user_name, text));
total += text.len();
}
}
} else if msg_type == "assistant" {
if let Some(text) = extract_text_content(&obj) {
let text = strip_system_tags(&text);
if text.len() > 10 {
fragments.push(format!("**{}:** {}", crate::config::get().assistant_name, text));
total += text.len();
}
}
}
if total > max_chars { break; }
}
fragments.join("\n\n")
}
fn extract_text_content(obj: &serde_json::Value) -> Option<String> {
let msg = obj.get("message")?;
let content = msg.get("content")?;
if let Some(s) = content.as_str() {
return Some(s.to_string());
}
if let Some(arr) = content.as_array() {
let texts: Vec<&str> = arr.iter()
.filter_map(|b| {
if b.get("type")?.as_str()? == "text" {
b.get("text")?.as_str()
} else {
None
}
})
.collect();
if !texts.is_empty() {
return Some(texts.join("\n"));
}
}
None
}
/// Count short user messages (dialogue turns) in a JSONL
fn count_dialogue_turns(path: &Path) -> usize {
let Ok(content) = fs::read_to_string(path) else { return 0 };
content.lines()
.filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
.filter(|obj| {
obj.get("type").and_then(|v| v.as_str()) == Some("user")
&& obj.get("userType").and_then(|v| v.as_str()) == Some("external")
})
.filter(|obj| {
let text = extract_text_content(obj).unwrap_or_default();
text.len() > 5 && text.len() < 500
&& !text.starts_with("[Request interrupted")
&& !text.starts_with("Implement the following")
})
.count()
}
/// Select conversation fragments for the observation extractor
fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
let projects = projects_dir();
if !projects.exists() { return Vec::new(); }
let mut jsonl_files: Vec<PathBuf> = Vec::new();
if let Ok(dirs) = fs::read_dir(&projects) {
for dir in dirs.filter_map(|e| e.ok()) {
if !dir.path().is_dir() { continue; }
if let Ok(files) = fs::read_dir(dir.path()) {
for f in files.filter_map(|e| e.ok()) {
let p = f.path();
if p.extension().map(|x| x == "jsonl").unwrap_or(false) {
if let Ok(meta) = p.metadata() {
if meta.len() > 50_000 {
jsonl_files.push(p);
}
}
}
}
}
}
}
let mut scored: Vec<(usize, PathBuf)> = jsonl_files.into_iter()
.map(|f| (count_dialogue_turns(&f), f))
.filter(|(turns, _)| *turns >= 10)
.collect();
scored.sort_by(|a, b| b.0.cmp(&a.0));
let mut fragments = Vec::new();
for (_, f) in scored.iter().take(n * 2) {
let session_id = f.file_stem()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let text = extract_conversation_text(f, 8000);
if text.len() > 500 {
fragments.push((session_id, text));
}
if fragments.len() >= n { break; }
}
fragments
}
pub fn run_observation_extractor(store: &Store, graph: &Graph, batch_size: usize) -> Result<String, String> {
let template = load_prompt("observation-extractor")?;
let topology = get_graph_topology(store, graph);
let fragments = select_conversation_fragments(batch_size);
let mut results = Vec::new();
for (i, (session_id, text)) in fragments.iter().enumerate() {
eprintln!(" Observation extractor {}/{}: session {}... ({} chars)",
i + 1, fragments.len(), &session_id[..session_id.len().min(12)], text.len());
let prompt = template
.replace("{{TOPOLOGY}}", &topology)
.replace("{{CONVERSATIONS}}", &format!("### Session {}\n\n{}", session_id, text));
let response = llm::call_sonnet("knowledge", &prompt)?;
results.push(format!("## Session: {}\n\n{}", session_id, response));
}
Ok(results.join("\n\n---\n\n"))
}
/// Load spectral embedding from disk
fn load_spectral_embedding() -> HashMap<String, Vec<f64>> {
spectral::load_embedding()
.map(|emb| emb.coords)
.unwrap_or_default()
}
fn spectral_distance(embedding: &HashMap<String, Vec<f64>>, a: &str, b: &str) -> f64 {
let (Some(va), Some(vb)) = (embedding.get(a), embedding.get(b)) else {
return f64::INFINITY;
};
let dot: f64 = va.iter().zip(vb.iter()).map(|(a, b)| a * b).sum();
let norm_a: f64 = va.iter().map(|x| x * x).sum::<f64>().sqrt();
let norm_b: f64 = vb.iter().map(|x| x * x).sum::<f64>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
return f64::INFINITY;
}
1.0 - dot / (norm_a * norm_b)
}
fn select_extractor_clusters(_store: &Store, n: usize) -> Vec<Vec<String>> {
let embedding = load_spectral_embedding();
let semantic_keys: Vec<&String> = embedding.keys().collect();
let cluster_size = 5;
let mut used = HashSet::new();
let mut clusters = Vec::new();
for _ in 0..n {
let available: Vec<&&String> = semantic_keys.iter()
.filter(|k| !used.contains(**k))
.collect();
if available.len() < cluster_size { break; }
let seed = available[0];
let mut distances: Vec<(f64, &String)> = available.iter()
.filter(|k| ***k != *seed)
.map(|k| (spectral_distance(&embedding, seed, k), **k))
.filter(|(d, _)| d.is_finite())
.collect();
distances.sort_by(|a, b| a.0.total_cmp(&b.0));
let cluster: Vec<String> = std::iter::once((*seed).clone())
.chain(distances.iter().take(cluster_size - 1).map(|(_, k)| (*k).clone()))
.collect();
for k in &cluster { used.insert(k.clone()); }
clusters.push(cluster);
}
clusters
}
pub fn run_extractor(store: &Store, graph: &Graph, batch_size: usize) -> Result<String, String> {
let template = load_prompt("extractor")?;
let topology = get_graph_topology(store, graph);
let clusters = select_extractor_clusters(store, batch_size);
let mut results = Vec::new();
for (i, cluster) in clusters.iter().enumerate() {
eprintln!(" Extractor cluster {}/{}: {} nodes", i + 1, clusters.len(), cluster.len());
let node_texts: Vec<String> = cluster.iter()
.filter_map(|key| {
let content = store.nodes.get(key)?.content.as_str();
Some(format!("### {}\n{}", key, content))
})
.collect();
if node_texts.is_empty() { continue; }
let prompt = template
.replace("{{TOPOLOGY}}", &topology)
.replace("{{NODES}}", &node_texts.join("\n\n"));
let response = llm::call_sonnet("knowledge", &prompt)?;
results.push(format!("## Cluster {}: {}...\n\n{}", i + 1,
cluster.iter().take(3).cloned().collect::<Vec<_>>().join(", "), response));
}
Ok(results.join("\n\n---\n\n"))
}
fn select_connector_pairs(store: &Store, graph: &Graph, n: usize) -> Vec<(Vec<String>, Vec<String>)> {
let embedding = load_spectral_embedding();
let semantic_keys: Vec<&String> = embedding.keys().collect();
let mut pairs = Vec::new();
let mut used = HashSet::new();
for seed in semantic_keys.iter().take(n * 10) {
if used.contains(*seed) { continue; }
let mut near: Vec<(f64, &String)> = semantic_keys.iter()
.filter(|k| ***k != **seed && !used.contains(**k))
.map(|k| (spectral_distance(&embedding, seed, k), *k))
.filter(|(d, _)| *d < 0.5 && d.is_finite())
.collect();
near.sort_by(|a, b| a.0.total_cmp(&b.0));
for (_, target) in near.iter().take(5) {
if !has_edge(store, seed, target) {
let _ = graph; // graph available for future use
used.insert((*seed).clone());
used.insert((*target).clone());
pairs.push((vec![(*seed).clone()], vec![(*target).clone()]));
break;
}
}
if pairs.len() >= n { break; }
}
pairs
}
pub fn run_connector(store: &Store, graph: &Graph, batch_size: usize) -> Result<String, String> {
let template = load_prompt("connector")?;
let topology = get_graph_topology(store, graph);
let pairs = select_connector_pairs(store, graph, batch_size);
let mut results = Vec::new();
for (i, (group_a, group_b)) in pairs.iter().enumerate() {
eprintln!(" Connector pair {}/{}", i + 1, pairs.len());
let nodes_a: Vec<String> = group_a.iter()
.filter_map(|k| {
let c = store.nodes.get(k)?.content.as_str();
Some(format!("### {}\n{}", k, c))
})
.collect();
let nodes_b: Vec<String> = group_b.iter()
.filter_map(|k| {
let c = store.nodes.get(k)?.content.as_str();
Some(format!("### {}\n{}", k, c))
})
.collect();
let prompt = template
.replace("{{TOPOLOGY}}", &topology)
.replace("{{NODES_A}}", &nodes_a.join("\n\n"))
.replace("{{NODES_B}}", &nodes_b.join("\n\n"));
let response = llm::call_sonnet("knowledge", &prompt)?;
results.push(format!("## Pair {}: {}{}\n\n{}",
i + 1, group_a.join(", "), group_b.join(", "), response));
}
Ok(results.join("\n\n---\n\n"))
}
pub fn run_challenger(store: &Store, graph: &Graph, batch_size: usize) -> Result<String, String> {
let template = load_prompt("challenger")?;
let topology = get_graph_topology(store, graph);
let mut candidates: Vec<(&String, usize)> = store.nodes.keys()
.map(|k| (k, graph.degree(k)))
.collect();
candidates.sort_by(|a, b| b.1.cmp(&a.1));
let mut results = Vec::new();
for (i, (key, _)) in candidates.iter().take(batch_size).enumerate() {
eprintln!(" Challenger {}/{}: {}", i + 1, batch_size.min(candidates.len()), key);
let content = match store.nodes.get(key.as_str()) {
Some(n) => &n.content,
None => continue,
};
let prompt = template
.replace("{{TOPOLOGY}}", &topology)
.replace("{{NODE_KEY}}", key)
.replace("{{NODE_CONTENT}}", content);
let response = llm::call_sonnet("knowledge", &prompt)?;
results.push(format!("## Challenge: {}\n\n{}", key, response));
}
Ok(results.join("\n\n---\n\n"))
}
// ---------------------------------------------------------------------------
// Convergence metrics
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CycleResult {
pub cycle: usize,
pub timestamp: String,
pub total_actions: usize,
pub total_applied: usize,
pub total_no_ops: usize,
pub depth_rejected: usize,
pub weighted_delta: f64,
pub graph_metrics_before: GraphMetrics,
pub graph_metrics_after: GraphMetrics,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct GraphMetrics {
pub nodes: usize,
pub edges: usize,
pub cc: f64,
pub sigma: f64,
pub communities: usize,
}
impl GraphMetrics {
pub fn from_graph(store: &Store, graph: &Graph) -> Self {
Self {
nodes: store.nodes.len(),
edges: graph.edge_count(),
cc: graph.avg_clustering_coefficient() as f64,
sigma: graph.small_world_sigma() as f64,
communities: graph.community_count(),
}
}
}
fn metric_stability(history: &[CycleResult], key: &str, window: usize) -> f64 {
if history.len() < window { return f64::INFINITY; }
let values: Vec<f64> = history[history.len() - window..].iter()
.map(|h| match key {
"sigma" => h.graph_metrics_after.sigma,
"cc" => h.graph_metrics_after.cc,
"communities" => h.graph_metrics_after.communities as f64,
_ => 0.0,
})
.collect();
if values.len() < 2 { return f64::INFINITY; }
let mean = values.iter().sum::<f64>() / values.len() as f64;
if mean == 0.0 { return 0.0; }
let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
variance.sqrt() / mean.abs()
}
pub fn check_convergence(history: &[CycleResult], window: usize) -> bool {
if history.len() < window { return false; }
let sigma_cv = metric_stability(history, "sigma", window);
let cc_cv = metric_stability(history, "cc", window);
let comm_cv = metric_stability(history, "communities", window);
let recent = &history[history.len() - window..];
let avg_delta = recent.iter().map(|r| r.weighted_delta).sum::<f64>() / recent.len() as f64;
eprintln!("\n Convergence check (last {} cycles):", window);
eprintln!(" sigma CV: {:.4} (< 0.05?)", sigma_cv);
eprintln!(" CC CV: {:.4} (< 0.05?)", cc_cv);
eprintln!(" community CV: {:.4} (< 0.10?)", comm_cv);
eprintln!(" avg delta: {:.2} (< 1.00?)", avg_delta);
let structural = sigma_cv < 0.05 && cc_cv < 0.05 && comm_cv < 0.10;
let behavioral = avg_delta < 1.0;
if structural && behavioral {
eprintln!(" → CONVERGED");
true
} else {
false
}
}
// ---------------------------------------------------------------------------
// The knowledge loop
// ---------------------------------------------------------------------------
pub struct KnowledgeLoopConfig {
pub max_cycles: usize,
pub batch_size: usize,
pub window: usize,
pub max_depth: i32,
pub confidence_base: f64,
}
impl Default for KnowledgeLoopConfig {
fn default() -> Self {
Self {
max_cycles: 20,
batch_size: 5,
window: 5,
max_depth: 4,
confidence_base: 0.3,
}
}
}
pub fn run_knowledge_loop(config: &KnowledgeLoopConfig) -> Result<Vec<CycleResult>, String> {
let mut store = Store::load()?;
let mut depth_db = DepthDb::load(&store);
let mut history = Vec::new();
eprintln!("Knowledge Loop — fixed-point iteration");
eprintln!(" max_cycles={} batch_size={}", config.max_cycles, config.batch_size);
eprintln!(" window={} max_depth={}", config.window, config.max_depth);
for cycle in 1..=config.max_cycles {
let result = run_cycle(cycle, config, &mut depth_db)?;
history.push(result);
if check_convergence(&history, config.window) {
eprintln!("\n CONVERGED after {} cycles", cycle);
break;
}
}
// Save loop summary as a store node
if let Some(first) = history.first() {
let key = format!("_knowledge-loop-{}", first.timestamp);
if let Ok(json) = serde_json::to_string_pretty(&history) {
store = Store::load()?;
store.upsert_provenance(&key, &json,
store::Provenance::AgentKnowledgeObservation).ok();
depth_db.save(&mut store);
store.save()?;
}
}
Ok(history)
}
fn run_cycle(
cycle_num: usize,
config: &KnowledgeLoopConfig,
depth_db: &mut DepthDb,
) -> Result<CycleResult, String> {
let timestamp = chrono::Local::now().format("%Y%m%dT%H%M%S").to_string();
eprintln!("\n{}", "=".repeat(60));
eprintln!("CYCLE {}{}", cycle_num, timestamp);
eprintln!("{}", "=".repeat(60));
let mut store = Store::load()?;
let graph = store.build_graph();
let metrics_before = GraphMetrics::from_graph(&store, &graph);
eprintln!(" Before: nodes={} edges={} cc={:.3} sigma={:.3}",
metrics_before.nodes, metrics_before.edges, metrics_before.cc, metrics_before.sigma);
let mut all_actions = Vec::new();
let mut all_no_ops = 0;
let mut depth_rejected = 0;
let mut total_applied = 0;
// Run each agent, rebuilding graph after mutations
let agent_names = ["observation", "extractor", "connector", "challenger"];
for agent_name in &agent_names {
eprintln!("\n --- {} (n={}) ---", agent_name, config.batch_size);
// Rebuild graph to reflect any mutations from previous agents
let graph = store.build_graph();
let output = match *agent_name {
"observation" => run_observation_extractor(&store, &graph, config.batch_size),
"extractor" => run_extractor(&store, &graph, config.batch_size),
"connector" => run_connector(&store, &graph, config.batch_size),
"challenger" => run_challenger(&store, &graph, config.batch_size),
_ => unreachable!(),
};
let output = match output {
Ok(o) => o,
Err(e) => {
eprintln!(" ERROR: {}", e);
continue;
}
};
// Store raw output as a node (for debugging/audit)
let raw_key = format!("_knowledge-{}-{}", agent_name, timestamp);
let raw_content = format!("# {} Agent Results — {}\n\n{}", agent_name, timestamp, output);
store.upsert_provenance(&raw_key, &raw_content,
agent_provenance(agent_name)).ok();
let mut actions = parse_all_actions(&output);
let no_ops = count_no_ops(&output);
all_no_ops += no_ops;
eprintln!(" Actions: {} No-ops: {}", actions.len(), no_ops);
let mut applied = 0;
for action in &mut actions {
let depth = compute_action_depth(depth_db, action, agent_name);
action.depth = depth;
match &action.kind {
ActionKind::WriteNode { key, covers, .. } => {
let conf_val = action.confidence.value();
let req = required_confidence(depth, config.confidence_base);
let source_uses: Vec<u32> = covers.iter()
.filter_map(|k| store.nodes.get(k).map(|n| n.uses))
.collect();
let avg_uses = if source_uses.is_empty() { 0 }
else { source_uses.iter().sum::<u32>() / source_uses.len() as u32 };
let eff_conf = (conf_val + use_bonus(avg_uses)).min(1.0);
if eff_conf < req {
action.applied = Some(false);
action.rejected_reason = Some("depth_threshold".into());
depth_rejected += 1;
continue;
}
if depth > config.max_depth {
action.applied = Some(false);
action.rejected_reason = Some("max_depth".into());
depth_rejected += 1;
continue;
}
eprintln!(" WRITE {} depth={} conf={:.2} eff={:.2} req={:.2}",
key, depth, conf_val, eff_conf, req);
}
ActionKind::Link { source, target } => {
eprintln!(" LINK {}{}", source, target);
}
ActionKind::Refine { key, .. } => {
eprintln!(" REFINE {} depth={}", key, depth);
}
}
if apply_action(&mut store, action, agent_name, &timestamp, depth) {
applied += 1;
action.applied = Some(true);
if let ActionKind::WriteNode { key, .. } | ActionKind::Refine { key, .. } = &action.kind {
depth_db.set(key.clone(), depth);
}
} else {
action.applied = Some(false);
}
}
eprintln!(" Applied: {}/{}", applied, actions.len());
total_applied += applied;
all_actions.extend(actions);
}
depth_db.save(&mut store);
// Recompute spectral if anything changed
if total_applied > 0 {
eprintln!("\n Recomputing spectral embedding...");
let graph = store.build_graph();
let result = spectral::decompose(&graph, 8);
let emb = spectral::to_embedding(&result);
spectral::save_embedding(&emb).ok();
}
let graph = store.build_graph();
let metrics_after = GraphMetrics::from_graph(&store, &graph);
let weighted_delta: f64 = all_actions.iter()
.filter(|a| a.applied == Some(true))
.map(|a| a.weight)
.sum();
eprintln!("\n CYCLE {} SUMMARY", cycle_num);
eprintln!(" Applied: {}/{} depth-rejected: {} no-ops: {}",
total_applied, all_actions.len(), depth_rejected, all_no_ops);
eprintln!(" Weighted delta: {:.2}", weighted_delta);
Ok(CycleResult {
cycle: cycle_num,
timestamp,
total_actions: all_actions.len(),
total_applied,
total_no_ops: all_no_ops,
depth_rejected,
weighted_delta,
graph_metrics_before: metrics_before,
graph_metrics_after: metrics_after,
})
}

View file

@ -0,0 +1,144 @@
// LLM utilities: model invocation and response parsing
//
// Calls claude CLI as a subprocess. Uses prctl(PR_SET_PDEATHSIG)
// so child processes die when the daemon exits, preventing orphans.
use crate::store::Store;
use regex::Regex;
use std::fs;
use std::os::unix::process::CommandExt;
use std::process::Command;
fn log_usage(agent: &str, model: &str, prompt: &str, response: &str,
duration_ms: u128, ok: bool) {
let dir = crate::config::get().data_dir.join("llm-logs").join(agent);
let _ = fs::create_dir_all(&dir);
let date = chrono::Local::now().format("%Y-%m-%d");
let path = dir.join(format!("{}.md", date));
let ts = chrono::Local::now().format("%H:%M:%S");
let status = if ok { "ok" } else { "ERROR" };
let entry = format!(
"\n## {} — {} ({}, {:.1}s, {})\n\n\
### Prompt ({} chars)\n\n\
```\n{}\n```\n\n\
### Response ({} chars)\n\n\
```\n{}\n```\n\n---\n",
ts, agent, model, duration_ms as f64 / 1000.0, status,
prompt.len(), prompt,
response.len(), response,
);
use std::io::Write;
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
let _ = f.write_all(entry.as_bytes());
}
}
/// Call a model via claude CLI. Returns the response text.
///
/// Sets PR_SET_PDEATHSIG on the child so it gets SIGTERM if the
/// parent daemon exits — no more orphaned claude processes.
fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String> {
// Write prompt to temp file (claude CLI needs file input for large prompts)
let tmp = std::env::temp_dir().join(format!("poc-llm-{}-{:?}.txt",
std::process::id(), std::thread::current().id()));
fs::write(&tmp, prompt)
.map_err(|e| format!("write temp prompt: {}", e))?;
let mut cmd = Command::new("claude");
cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence"])
.stdin(fs::File::open(&tmp).map_err(|e| format!("open temp: {}", e))?)
.env_remove("CLAUDECODE");
// Use separate OAuth credentials for agent work if configured
if let Some(ref dir) = crate::config::get().agent_config_dir {
cmd.env("CLAUDE_CONFIG_DIR", dir);
}
// Tell hooks this is a daemon agent call, not interactive
cmd.env("POC_AGENT", "1");
let start = std::time::Instant::now();
let result = unsafe {
cmd.pre_exec(|| {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
Ok(())
})
.output()
};
fs::remove_file(&tmp).ok();
match result {
Ok(output) => {
let elapsed = start.elapsed().as_millis();
if output.status.success() {
let response = String::from_utf8_lossy(&output.stdout).trim().to_string();
log_usage(agent, model, prompt, &response, elapsed, true);
Ok(response)
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
let preview = crate::util::first_n_chars(&stderr, 500);
log_usage(agent, model, prompt, &preview, elapsed, false);
Err(format!("claude exited {}: {}", output.status, preview.trim()))
}
}
Err(e) => Err(format!("spawn claude: {}", e)),
}
}
/// Call Sonnet via claude CLI.
pub(crate) fn call_sonnet(agent: &str, prompt: &str) -> Result<String, String> {
call_model(agent, "sonnet", prompt)
}
/// Call Haiku via claude CLI (cheaper, faster — good for high-volume extraction).
pub(crate) fn call_haiku(agent: &str, prompt: &str) -> Result<String, String> {
call_model(agent, "haiku", prompt)
}
/// Parse a JSON response, handling markdown fences.
pub(crate) fn parse_json_response(response: &str) -> Result<serde_json::Value, String> {
let cleaned = response.trim();
let cleaned = cleaned.strip_prefix("```json").unwrap_or(cleaned);
let cleaned = cleaned.strip_prefix("```").unwrap_or(cleaned);
let cleaned = cleaned.strip_suffix("```").unwrap_or(cleaned);
let cleaned = cleaned.trim();
if let Ok(v) = serde_json::from_str(cleaned) {
return Ok(v);
}
// Try to find JSON object or array
let re_obj = Regex::new(r"\{[\s\S]*\}").unwrap();
let re_arr = Regex::new(r"\[[\s\S]*\]").unwrap();
if let Some(m) = re_obj.find(cleaned) {
if let Ok(v) = serde_json::from_str(m.as_str()) {
return Ok(v);
}
}
if let Some(m) = re_arr.find(cleaned) {
if let Ok(v) = serde_json::from_str(m.as_str()) {
return Ok(v);
}
}
let preview = crate::util::first_n_chars(cleaned, 200);
Err(format!("no valid JSON in response: {preview}..."))
}
/// Get all keys for prompt context.
pub(crate) fn semantic_keys(store: &Store) -> Vec<String> {
let mut keys: Vec<String> = store.nodes.keys()
.cloned()
.collect();
keys.sort();
keys.truncate(200);
keys
}

View file

@ -0,0 +1,25 @@
// Agent layer: LLM-powered operations on the memory graph
//
// Everything here calls external models (Sonnet, Haiku) or orchestrates
// sequences of such calls. The core graph infrastructure (store, graph,
// spectral, search, similarity) lives at the crate root.
//
// llm — model invocation, response parsing
// prompts — prompt generation from store data
// audit — link quality review via Sonnet
// consolidate — full consolidation pipeline
// knowledge — knowledge production agents + convergence loop
// enrich — journal enrichment, experience mining
// fact_mine — fact extraction from transcripts
// digest — episodic digest generation (daily/weekly/monthly)
// daemon — background job scheduler
pub mod llm;
pub mod prompts;
pub mod audit;
pub mod consolidate;
pub mod knowledge;
pub mod enrich;
pub mod fact_mine;
pub mod digest;
pub mod daemon;

View file

@ -0,0 +1,374 @@
// Agent prompt generation and formatting. Presentation logic —
// builds text prompts from store data for consolidation agents.
use crate::store::Store;
use crate::graph::Graph;
use crate::similarity;
use crate::spectral;
use crate::neuro::{
ReplayItem, consolidation_priority,
replay_queue, replay_queue_with_graph, detect_interference,
};
/// Load a prompt template, replacing {{PLACEHOLDER}} with data
pub fn load_prompt(name: &str, replacements: &[(&str, &str)]) -> Result<String, String> {
let path = crate::config::get().prompts_dir.join(format!("{}.md", name));
let mut content = std::fs::read_to_string(&path)
.map_err(|e| format!("load prompt {}: {}", path.display(), e))?;
for (placeholder, data) in replacements {
content = content.replace(placeholder, data);
}
Ok(content)
}
/// Format topology header for agent prompts — current graph health metrics
fn format_topology_header(graph: &Graph) -> String {
let sigma = graph.small_world_sigma();
let alpha = graph.degree_power_law_exponent();
let gini = graph.degree_gini();
let avg_cc = graph.avg_clustering_coefficient();
let n = graph.nodes().len();
let e = graph.edge_count();
// Identify saturated hubs — nodes with degree well above threshold
let threshold = graph.hub_threshold();
let mut hubs: Vec<_> = graph.nodes().iter()
.map(|k| (k.clone(), graph.degree(k)))
.filter(|(_, d)| *d >= threshold)
.collect();
hubs.sort_by(|a, b| b.1.cmp(&a.1));
hubs.truncate(15);
let hub_list = if hubs.is_empty() {
String::new()
} else {
let lines: Vec<String> = hubs.iter()
.map(|(k, d)| format!(" - {} (degree {})", k, d))
.collect();
format!(
"### SATURATED HUBS — DO NOT LINK TO THESE\n\
The following nodes are already over-connected. Adding more links\n\
to them makes the graph worse (star topology). Find lateral\n\
connections between peripheral nodes instead.\n\n{}\n\n\
Only link to a hub if it is genuinely the ONLY reasonable target.\n\n",
lines.join("\n"))
};
format!(
"## Current graph topology\n\
Nodes: {} Edges: {} Communities: {}\n\
Small-world σ: {:.1} Power-law α: {:.2} Degree Gini: {:.3}\n\
Avg clustering coefficient: {:.4}\n\n\
{}\
Each node below shows its hub-link ratio (fraction of edges to top-5% degree nodes).\n\
Use `poc-memory link-impact SOURCE TARGET` to evaluate proposed links.\n\n",
n, e, graph.community_count(), sigma, alpha, gini, avg_cc, hub_list)
}
/// Format node data section for prompt templates
fn format_nodes_section(store: &Store, items: &[ReplayItem], graph: &Graph) -> String {
let hub_thresh = graph.hub_threshold();
let mut out = String::new();
for item in items {
let node = match store.nodes.get(&item.key) {
Some(n) => n,
None => continue,
};
out.push_str(&format!("## {} \n", item.key));
out.push_str(&format!("Priority: {:.3} CC: {:.3} Emotion: {:.1} ",
item.priority, item.cc, item.emotion));
out.push_str(&format!("Interval: {}d\n",
node.spaced_repetition_interval));
if item.outlier_score > 0.0 {
out.push_str(&format!("Spectral: {} (outlier={:.1})\n",
item.classification, item.outlier_score));
}
if let Some(community) = node.community_id {
out.push_str(&format!("Community: {} ", community));
}
let deg = graph.degree(&item.key);
let cc = graph.clustering_coefficient(&item.key);
// Hub-link ratio: what fraction of this node's edges go to hubs?
let neighbors = graph.neighbors(&item.key);
let hub_links = neighbors.iter()
.filter(|(n, _)| graph.degree(n) >= hub_thresh)
.count();
let hub_ratio = if deg > 0 { hub_links as f32 / deg as f32 } else { 0.0 };
let is_hub = deg >= hub_thresh;
out.push_str(&format!("Degree: {} CC: {:.3} Hub-link ratio: {:.0}% ({}/{})",
deg, cc, hub_ratio * 100.0, hub_links, deg));
if is_hub {
out.push_str(" ← THIS IS A HUB");
} else if hub_ratio > 0.6 {
out.push_str(" ← mostly hub-connected, needs lateral links");
}
out.push('\n');
// Content (truncated for large nodes)
let content = &node.content;
if content.len() > 1500 {
let truncated = crate::util::truncate(content, 1500, "\n[...]");
out.push_str(&format!("\nContent ({} chars, truncated):\n{}\n\n",
content.len(), truncated));
} else {
out.push_str(&format!("\nContent:\n{}\n\n", content));
}
// Neighbors
let neighbors = graph.neighbors(&item.key);
if !neighbors.is_empty() {
out.push_str("Neighbors:\n");
for (n, strength) in neighbors.iter().take(15) {
let n_cc = graph.clustering_coefficient(n);
let n_community = store.nodes.get(n.as_str())
.and_then(|n| n.community_id);
out.push_str(&format!(" - {} (str={:.2}, cc={:.3}",
n, strength, n_cc));
if let Some(c) = n_community {
out.push_str(&format!(", c{}", c));
}
out.push_str(")\n");
}
}
// Suggested link targets: text-similar semantic nodes not already neighbors
let neighbor_keys: std::collections::HashSet<&str> = neighbors.iter()
.map(|(k, _)| k.as_str()).collect();
let mut candidates: Vec<(&str, f32)> = store.nodes.iter()
.filter(|(k, _)| {
*k != &item.key
&& !neighbor_keys.contains(k.as_str())
})
.map(|(k, n)| {
let sim = similarity::cosine_similarity(content, &n.content);
(k.as_str(), sim)
})
.filter(|(_, sim)| *sim > 0.1)
.collect();
candidates.sort_by(|a, b| b.1.total_cmp(&a.1));
candidates.truncate(8);
if !candidates.is_empty() {
out.push_str("\nSuggested link targets (by text similarity, not yet linked):\n");
for (k, sim) in &candidates {
let is_hub = graph.degree(k) >= hub_thresh;
out.push_str(&format!(" - {} (sim={:.3}{})\n",
k, sim, if is_hub { ", HUB" } else { "" }));
}
}
out.push_str("\n---\n\n");
}
out
}
/// Format health data for the health agent prompt
fn format_health_section(store: &Store, graph: &Graph) -> String {
use crate::graph;
let health = graph::health_report(graph, store);
let mut out = health;
out.push_str("\n\n## Weight distribution\n");
// Weight histogram
let mut buckets = [0u32; 10]; // 0.0-0.1, 0.1-0.2, ..., 0.9-1.0
for node in store.nodes.values() {
let bucket = ((node.weight * 10.0) as usize).min(9);
buckets[bucket] += 1;
}
for (i, &count) in buckets.iter().enumerate() {
let lo = i as f32 / 10.0;
let hi = (i + 1) as f32 / 10.0;
let bar = "".repeat((count as usize) / 10);
out.push_str(&format!(" {:.1}-{:.1}: {:4} {}\n", lo, hi, count, bar));
}
// Near-prune nodes
let near_prune: Vec<_> = store.nodes.iter()
.filter(|(_, n)| n.weight < 0.15)
.map(|(k, n)| (k.clone(), n.weight))
.collect();
if !near_prune.is_empty() {
out.push_str(&format!("\n## Near-prune nodes ({} total)\n", near_prune.len()));
for (k, w) in near_prune.iter().take(20) {
out.push_str(&format!(" [{:.3}] {}\n", w, k));
}
}
// Community sizes
let communities = graph.communities();
let mut comm_sizes: std::collections::HashMap<u32, Vec<String>> = std::collections::HashMap::new();
for (key, &label) in communities {
comm_sizes.entry(label).or_default().push(key.clone());
}
let mut sizes: Vec<_> = comm_sizes.iter()
.map(|(id, members)| (*id, members.len(), members.clone()))
.collect();
sizes.sort_by(|a, b| b.1.cmp(&a.1));
out.push_str("\n## Largest communities\n");
for (id, size, members) in sizes.iter().take(10) {
out.push_str(&format!(" Community {} ({} nodes): ", id, size));
let sample: Vec<_> = members.iter().take(5).map(|s| s.as_str()).collect();
out.push_str(&sample.join(", "));
if *size > 5 { out.push_str(", ..."); }
out.push('\n');
}
out
}
/// Format interference pairs for the separator agent prompt
fn format_pairs_section(
pairs: &[(String, String, f32)],
store: &Store,
graph: &Graph,
) -> String {
let mut out = String::new();
let communities = graph.communities();
for (a, b, sim) in pairs {
out.push_str(&format!("## Pair: similarity={:.3}\n", sim));
let ca = communities.get(a).map(|c| format!("c{}", c)).unwrap_or_else(|| "?".into());
let cb = communities.get(b).map(|c| format!("c{}", c)).unwrap_or_else(|| "?".into());
// Node A
out.push_str(&format!("\n### {} ({})\n", a, ca));
if let Some(node) = store.nodes.get(a) {
let content = crate::util::truncate(&node.content, 500, "...");
out.push_str(&format!("Weight: {:.2}\n{}\n",
node.weight, content));
}
// Node B
out.push_str(&format!("\n### {} ({})\n", b, cb));
if let Some(node) = store.nodes.get(b) {
let content = crate::util::truncate(&node.content, 500, "...");
out.push_str(&format!("Weight: {:.2}\n{}\n",
node.weight, content));
}
out.push_str("\n---\n\n");
}
out
}
/// Run agent consolidation on top-priority nodes
pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> {
let graph = store.build_graph();
let items = replay_queue(store, count);
if items.is_empty() {
println!("No nodes to consolidate.");
return Ok(());
}
let nodes_section = format_nodes_section(store, &items, &graph);
if auto {
let prompt = load_prompt("replay", &[("{{NODES}}", &nodes_section)])?;
println!("{}", prompt);
} else {
// Interactive: show what needs attention and available agent types
println!("Consolidation batch ({} nodes):\n", items.len());
for item in &items {
let node_type = store.nodes.get(&item.key)
.map(|n| if matches!(n.node_type, crate::store::NodeType::EpisodicSession) { "episodic" } else { "semantic" })
.unwrap_or("?");
println!(" [{:.3}] {} (cc={:.3}, interval={}d, type={})",
item.priority, item.key, item.cc, item.interval_days, node_type);
}
// Also show interference pairs
let pairs = detect_interference(store, &graph, 0.6);
if !pairs.is_empty() {
println!("\nInterfering pairs ({}):", pairs.len());
for (a, b, sim) in pairs.iter().take(5) {
println!(" [{:.3}] {}{}", sim, a, b);
}
}
println!("\nAgent prompts:");
println!(" --auto Generate replay agent prompt");
println!(" --agent replay Replay agent (schema assimilation)");
println!(" --agent linker Linker agent (relational binding)");
println!(" --agent separator Separator agent (pattern separation)");
println!(" --agent transfer Transfer agent (CLS episodic→semantic)");
println!(" --agent health Health agent (synaptic homeostasis)");
}
Ok(())
}
/// Generate a specific agent prompt with filled-in data
pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String, String> {
let graph = store.build_graph();
let topology = format_topology_header(&graph);
let emb = spectral::load_embedding().ok();
match agent {
"replay" => {
let items = replay_queue_with_graph(store, count, &graph, emb.as_ref());
let nodes_section = format_nodes_section(store, &items, &graph);
load_prompt("replay", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])
}
"linker" => {
// Filter to episodic entries
let mut items = replay_queue_with_graph(store, count * 2, &graph, emb.as_ref());
items.retain(|item| {
store.nodes.get(&item.key)
.map(|n| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.unwrap_or(false)
});
items.truncate(count);
let nodes_section = format_nodes_section(store, &items, &graph);
load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])
}
"separator" => {
let mut pairs = detect_interference(store, &graph, 0.5);
pairs.truncate(count);
let pairs_section = format_pairs_section(&pairs, store, &graph);
load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)])
}
"transfer" => {
// Recent episodic entries
let mut episodes: Vec<_> = store.nodes.iter()
.filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession))
.map(|(k, n)| (k.clone(), n.timestamp))
.collect();
episodes.sort_by(|a, b| b.1.cmp(&a.1));
episodes.truncate(count);
let episode_keys: Vec<_> = episodes.iter().map(|(k, _)| k.clone()).collect();
let items: Vec<ReplayItem> = episode_keys.iter()
.filter_map(|k| {
let node = store.nodes.get(k)?;
Some(ReplayItem {
key: k.clone(),
priority: consolidation_priority(store, k, &graph, None),
interval_days: node.spaced_repetition_interval,
emotion: node.emotion,
cc: graph.clustering_coefficient(k),
classification: "unknown",
outlier_score: 0.0,
})
})
.collect();
let episodes_section = format_nodes_section(store, &items, &graph);
load_prompt("transfer", &[("{{TOPOLOGY}}", &topology), ("{{EPISODES}}", &episodes_section)])
}
"health" => {
let health_section = format_health_section(store, &graph);
load_prompt("health", &[("{{TOPOLOGY}}", &topology), ("{{HEALTH}}", &health_section)])
}
_ => Err(format!("Unknown agent: {}. Use: replay, linker, separator, transfer, health", agent)),
}
}