diff --git a/Cargo.lock b/Cargo.lock index d8b0221..a2c0262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,6 +555,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "skillratings", "tokenizers", "tokio", "tokio-rustls", @@ -2801,6 +2802,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" +[[package]] +name = "skillratings" +version = "0.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a6ee7559737c1adcd9184f168a04dc360c84878907c3ecc5c33c2320be1d47a" + [[package]] name = "slab" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index 2c5246f..c97840f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ capnp = "0.25" capnp-rpc = "0.25" tokenizers = "0.21" +skillratings = "0.28" http = "1" hyper = { version = "1", features = ["client", "http1"] } diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 532a659..1545b04 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use anyhow::Result; use api::ApiClient; -use context::{AstNode, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; +use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; use crate::mind::log::ConversationLog; @@ -105,6 +105,9 @@ pub async fn start_activity(agent: &Arc, label: impl Into) -> Act /// Result of a single agent turn. pub struct TurnResult { + /// The text response (already sent through UI channel). + #[allow(dead_code)] + pub text: String, /// Whether the model called yield_to_user during this turn. pub yield_requested: bool, /// Whether any tools (other than yield_to_user) were called. @@ -442,12 +445,24 @@ impl Agent { } // Text-only response — extract text and return + let text = { + let ctx = agent.context.lock().await; + let children = ctx.conversation()[branch_idx].children(); + children.iter() + .filter_map(|c| c.leaf()) + .filter(|l| matches!(l.body(), NodeBody::Content(_))) + .map(|l| l.body().text()) + .collect::>() + .join("") + }; + let mut st = agent.state.lock().await; if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; } if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); } if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; } return Ok(TurnResult { + text, yield_requested: ds.yield_requested, had_tool_calls: ds.had_tool_calls, tool_errors: ds.tool_errors, diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 1baa08e..f218080 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -244,7 +244,7 @@ impl AutoAgent { pub async fn run( &mut self, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, - ) -> Result<(), String> { + ) -> Result { let config = crate::config::get(); let base_url = config.api_base_url.as_deref().unwrap_or(""); let api_key = config.api_key.as_deref().unwrap_or(""); @@ -288,7 +288,7 @@ impl AutoAgent { pub async fn run_shared( &mut self, agent: &std::sync::Arc, - ) -> Result<(), String> { + ) -> Result { let mut backend = Backend(agent.clone()); self.run_with_backend(&mut backend, None).await } @@ -301,7 +301,7 @@ impl AutoAgent { memory_keys: &[String], state: &std::collections::BTreeMap, recently_written: &[String], - ) -> Result<(), String> { + ) -> Result { let resolved_steps: Vec = self.steps.iter().map(|s| AutoStep { prompt: resolve_prompt(&s.prompt, memory_keys, state, recently_written), phase: s.phase.clone(), @@ -347,23 +347,67 @@ impl AutoAgent { &mut self, backend: &mut Backend, bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, - ) -> Result<(), String> { + ) -> Result { dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len()); + self.turn = 0; + self.current_phase = self.steps.first() + .map(|s| s.phase.clone()).unwrap_or_default(); + let mut next_step = 0; - for (i, step) in self.steps.iter().enumerate() { - self.turn = i + 1; - self.current_phase = step.phase.clone(); - - if let Some(ref check) = bail_fn { - check(i)?; - } - - backend.push_node(AstNode::system_msg(&step.prompt)).await; - Agent::turn(backend.0.clone()).await - .map_err(|e| format!("{}: {}", self.name, e))?; + if next_step < self.steps.len() { + backend.push_node( + AstNode::system_msg(&self.steps[next_step].prompt)).await; + next_step += 1; } - Ok(()) + let max_turns = 50 * self.steps.len().max(1); + + for _ in 0..max_turns { + self.turn += 1; + + let result = match Agent::turn(backend.0.clone()).await { + Ok(r) => r, + Err(e) if super::context::is_context_overflow(&e) => { + dbglog!("[auto] {} context full, stopping gracefully", self.name); + return Ok(String::new()); + } + Err(e) => return Err(format!("{}: {}", self.name, e)), + }; + + if result.had_tool_calls { + continue; + } + + let text = result.text; + if text.is_empty() { + dbglog!("[auto] {} empty response, retrying", self.name); + backend.push_node(AstNode::system_msg( + "Your previous response was empty. \ + Please respond with text or use a tool." + )).await; + continue; + } + + dbglog!("[auto] {} response: {}", + self.name, &text[..text.floor_char_boundary(text.len().min(200))]); + + if next_step < self.steps.len() { + if let Some(ref check) = bail_fn { + check(next_step)?; + } + self.current_phase = self.steps[next_step].phase.clone(); + backend.push_node( + AstNode::system_msg(&self.steps[next_step].prompt)).await; + next_step += 1; + dbglog!("[auto] {} step {}/{}", + self.name, next_step, self.steps.len()); + continue; + } + + return Ok(text); + } + + Err(format!("{}: exceeded {} tool turns", self.name, max_turns)) } } @@ -374,6 +418,7 @@ impl AutoAgent { /// Result of running a single agent. pub struct AgentResult { + pub output: String, pub node_keys: Vec, /// Directory containing output() files from the agent run. pub state_dir: PathBuf, @@ -511,11 +556,12 @@ pub fn run_one_agent( Ok(()) }; - call_api_with_tools_sync( + let output = call_api_with_tools_sync( agent_name, &prompts, &step_phases, def.temperature, def.priority, &effective_tools, Some(&bail_fn))?; Ok(AgentResult { + output, node_keys: agent_batch.node_keys, state_dir, }) @@ -535,7 +581,7 @@ pub async fn call_api_with_tools( priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, -) -> Result<(), String> { +) -> Result { let steps: Vec = prompts.iter().zip( phases.iter().map(String::as_str) .chain(std::iter::repeat("")) @@ -564,7 +610,7 @@ pub fn call_api_with_tools_sync( priority: i32, tools: &[agent_tools::Tool], bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, -) -> Result<(), String> { +) -> Result { std::thread::scope(|s| { s.spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index 588721b..f526d44 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -101,27 +101,11 @@ pub fn journal_tools() -> [super::Tool; 3] { } }"#, handler: Arc::new(|_a, v| Box::pin(async move { journal_tail(&v).await })) }, - Tool { name: "journal_new", description: "Start a new journal/digest entry.", - parameters_json: r#"{ - "type": "object", - "properties": { - "name": {"type": "string", "description": "Short node name (becomes the key)"}, - "title": {"type": "string", "description": "Descriptive title"}, - "body": {"type": "string", "description": "Entry body"}, - "level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly", "default": 0} - }, - "required": ["name", "title", "body"] - }"#, + Tool { name: "journal_new", description: "Start a new journal entry.", + parameters_json: r#"{"type":"object","properties":{"name":{"type":"string","description":"Short node name (becomes the key)"},"title":{"type":"string","description":"Descriptive title"},"body":{"type":"string","description":"Entry body"}},"required":["name","title","body"]}"#, handler: Arc::new(|a, v| Box::pin(async move { journal_new(&a, &v).await })) }, - Tool { name: "journal_update", description: "Append text to the most recent entry at a level.", - parameters_json: r#"{ - "type": "object", - "properties": { - "body": {"type": "string", "description": "Text to append"}, - "level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly", "default": 0} - }, - "required": ["body"] - }"#, + Tool { name: "journal_update", description: "Append text to the most recent journal entry.", + parameters_json: r#"{"type":"object","properties":{"body":{"type":"string","description":"Text to append"}},"required":["body"]}"#, handler: Arc::new(|a, v| Box::pin(async move { journal_update(&a, &v).await })) }, ] } @@ -373,20 +357,10 @@ async fn journal_tail(args: &serde_json::Value) -> Result { query(&serde_json::json!({"query": q, "format": format})).await } -fn level_to_node_type(level: i64) -> crate::store::NodeType { - match level { - 1 => crate::store::NodeType::EpisodicDaily, - 2 => crate::store::NodeType::EpisodicWeekly, - 3 => crate::store::NodeType::EpisodicMonthly, - _ => crate::store::NodeType::EpisodicSession, - } -} - async fn journal_new(agent: &Option>, args: &serde_json::Value) -> Result { let name = get_str(args, "name")?; let title = get_str(args, "title")?; let body = get_str(args, "body")?; - let level = args.get("level").and_then(|v| v.as_i64()).unwrap_or(0); let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M"); let content = format!("## {} — {}\n\n{}", ts, title, body); @@ -412,7 +386,7 @@ async fn journal_new(agent: &Option>, args: base_key.to_string() }; let mut node = crate::store::new_node(&key, &content); - node.node_type = level_to_node_type(level); + node.node_type = crate::store::NodeType::EpisodicSession; node.provenance = get_provenance(agent).await; store.upsert_node(node).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; @@ -422,16 +396,14 @@ async fn journal_new(agent: &Option>, args: async fn journal_update(agent: &Option>, args: &serde_json::Value) -> Result { let body = get_str(args, "body")?; - let level = args.get("level").and_then(|v| v.as_i64()).unwrap_or(0); - let node_type = level_to_node_type(level); let arc = cached_store().await?; let mut store = arc.lock().await; let latest_key = store.nodes.values() - .filter(|n| n.node_type == node_type) + .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .max_by_key(|n| n.created_at) .map(|n| n.key.clone()); let Some(key) = latest_key else { - anyhow::bail!("no entry at level {} to update — use journal_new first", level); + anyhow::bail!("no journal entry to update — use journal_new first"); }; let existing = store.nodes.get(&key).unwrap().content.clone(); let new_content = format!("{}\n\n{}", existing.trim_end(), body); diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 1a8fe4d..d2d05c8 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -1,7 +1,6 @@ // cli/agent.rs — agent subcommand handlers use crate::store; -use crate::subconscious::digest; pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<(), String> { // Mark as agent so tool calls (e.g. poc-memory render) don't @@ -57,6 +56,23 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option Ok(()) } +pub fn cmd_consolidate_batch(count: usize, auto: bool, agent: Option) -> Result<(), String> { + let store = store::Store::load()?; + + if let Some(agent_name) = agent { + let batch = crate::agents::prompts::agent_prompt(&store, &agent_name, count)?; + for (i, s) in batch.steps.iter().enumerate() { + if batch.steps.len() > 1 { + println!("=== STEP {} ({}) ===\n", i + 1, s.phase); + } + println!("{}", s.prompt); + } + Ok(()) + } else { + crate::agents::prompts::consolidation_batch(&store, count, auto) + } +} + pub fn cmd_replay_queue(count: usize) -> Result<(), String> { let store = store::Store::load()?; let queue = crate::neuro::replay_queue(&store, count); @@ -69,9 +85,21 @@ pub fn cmd_replay_queue(count: usize) -> Result<(), String> { Ok(()) } +pub fn cmd_consolidate_session() -> Result<(), String> { + let store = store::Store::load()?; + let plan = crate::neuro::consolidation_plan(&store); + println!("{}", crate::neuro::format_plan(&plan)); + Ok(()) +} + +pub fn cmd_consolidate_full() -> Result<(), String> { + let mut store = store::Store::load()?; + crate::consolidate::consolidate_full(&mut store) +} + pub fn cmd_digest_links(do_apply: bool) -> Result<(), String> { let store = store::Store::load()?; - let links = digest::parse_all_digest_links(&store); + let links = crate::digest::parse_all_digest_links(&store); drop(store); println!("Found {} unique links from digest nodes", links.len()); @@ -87,7 +115,269 @@ pub fn cmd_digest_links(do_apply: bool) -> Result<(), String> { } let mut store = store::Store::load()?; - let (applied, skipped, fallbacks) = digest::apply_digest_links(&mut store, &links); + let (applied, skipped, fallbacks) = crate::digest::apply_digest_links(&mut store, &links); println!("\nApplied: {} ({} file-level fallbacks) Skipped: {}", applied, fallbacks, skipped); Ok(()) } + +pub fn cmd_journal_enrich(_jsonl_path: &str, _entry_text: &str, _grep_line: usize) -> Result<(), String> { + Err("journal-enrich has been removed — use the observation agent instead.".into()) +} + +pub fn cmd_apply_consolidation(_do_apply: bool, _report_file: Option<&str>) -> Result<(), String> { + Err("apply-consolidation has been removed — agents now apply changes via tool calls directly.".into()) +} + +pub fn cmd_knowledge_loop(_max_cycles: usize, _batch_size: usize, _window: usize, _max_depth: i32) -> Result<(), String> { + Err("knowledge-loop has been removed — agents now use tool calls directly. Use `poc-memory agent run` instead.".into()) +} + +pub fn cmd_fact_mine(_path: &str, _batch: bool, _dry_run: bool, _output_file: Option<&str>, _min_messages: usize) -> Result<(), String> { + Err("fact-mine has been removed — use the observation agent instead.".into()) +} + +pub fn cmd_fact_mine_store(_path: &str) -> Result<(), String> { + Err("fact-mine-store has been removed — use the observation agent instead.".into()) +} + +/// Sample recent actions from each agent type, sort by quality using +/// LLM pairwise comparison, report per-type rankings. +/// Elo ratings file path +fn elo_path() -> std::path::PathBuf { + crate::config::get().data_dir.join("agent-elo.json") +} + +/// Load persisted Elo ratings, or initialize at 1000.0 +fn load_elo_ratings(agent_types: &[&str]) -> std::collections::HashMap { + let path = elo_path(); + let mut ratings: std::collections::HashMap = std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default(); + for t in agent_types { + ratings.entry(t.to_string()).or_insert(1000.0); + } + ratings +} + +fn save_elo_ratings(ratings: &std::collections::HashMap) { + let path = elo_path(); + if let Ok(json) = serde_json::to_string_pretty(ratings) { + let _ = std::fs::write(path, json); + } +} + +pub fn cmd_evaluate_agents(matchups: usize, model: &str, dry_run: bool) -> Result<(), String> { + use skillratings::elo::{elo, EloConfig, EloRating}; + use skillratings::Outcomes; + + let store = store::Store::load()?; + + let agent_types: Vec<&str> = vec![ + "linker", "organize", "distill", "separator", + "split", "rename", + ]; + + // Load agent prompt files + let prompts_dir = { + let repo = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("agents"); + if repo.is_dir() { repo } else { crate::store::memory_dir().join("agents") } + }; + + // Collect recent actions per agent type + let mut actions: std::collections::HashMap> = std::collections::HashMap::new(); + + for agent_type in &agent_types { + let prompt_file = prompts_dir.join(format!("{}.agent", agent_type)); + let agent_prompt = std::fs::read_to_string(&prompt_file) + .unwrap_or_default() + .lines().skip(1).collect::>().join("\n"); + let agent_prompt = crate::util::truncate(&agent_prompt, 500, "..."); + + let prefix = format!("_consolidate-{}", agent_type); + let mut keys: Vec<(String, i64)> = store.nodes.iter() + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(k, n)| (k.clone(), n.timestamp)) + .collect(); + keys.sort_by(|a, b| b.1.cmp(&a.1)); + keys.truncate(20); // pool of recent actions to sample from + + let mut type_actions = Vec::new(); + for (key, _) in &keys { + let report = store.nodes.get(key) + .map(|n| n.content.clone()) + .unwrap_or_default(); + + let mut target_content = String::new(); + let mut seen = std::collections::HashSet::new(); + for word in report.split_whitespace() { + let clean = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '-' && c != '_'); + if clean.len() > 10 && seen.insert(clean.to_string()) && store.nodes.contains_key(clean) + && let Some(node) = store.nodes.get(clean) { + let preview = crate::util::truncate(&node.content, 200, "..."); + target_content.push_str(&format!("\n### {}\n{}\n", clean, preview)); + if target_content.len() > 1500 { break; } + } + } + + let context = format!( + "## Agent instructions\n{}\n\n## Report output\n{}\n\n## Affected nodes\n{}", + agent_prompt, + crate::util::truncate(&report, 1000, "..."), + if target_content.is_empty() { "(none found)".into() } else { target_content } + ); + type_actions.push((key.clone(), context)); + } + actions.insert(agent_type.to_string(), type_actions); + } + + // Filter to types that have at least 1 action + let active_types: Vec<&str> = agent_types.iter() + .filter(|t| actions.get(**t).map(|a| !a.is_empty()).unwrap_or(false)) + .copied() + .collect(); + + if active_types.len() < 2 { + return Err("Need at least 2 agent types with actions".into()); + } + + eprintln!("Evaluating {} agent types with {} matchups (model={})", + active_types.len(), matchups, model); + + if dry_run { + let t1 = active_types[0]; + let t2 = active_types[active_types.len() - 1]; + let a1 = &actions[t1][0]; + let a2 = &actions[t2][0]; + let sample_a = (t1.to_string(), a1.0.clone(), a1.1.clone()); + let sample_b = (t2.to_string(), a2.0.clone(), a2.1.clone()); + println!("=== DRY RUN: Example comparison ===\n"); + println!("{}", build_compare_prompt(&sample_a, &sample_b)); + return Ok(()); + } + + // Load persisted ratings + let mut ratings = load_elo_ratings(&agent_types); + let config = EloConfig { k: 32.0 }; + // Simple but adequate RNG: xorshift32 + let mut rng = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH).unwrap().subsec_nanos() | 1; + let mut next_rng = || -> usize { + rng ^= rng << 13; + rng ^= rng >> 17; + rng ^= rng << 5; + rng as usize + }; + + for i in 0..matchups { + // Pick two different random agent types + let idx_a = next_rng() % active_types.len(); + let mut idx_b = next_rng() % active_types.len(); + if idx_b == idx_a { idx_b = (idx_b + 1) % active_types.len(); } + + let type_a = active_types[idx_a]; + let type_b = active_types[idx_b]; + + // Pick random recent action from each + let acts_a = &actions[type_a]; + let acts_b = &actions[type_b]; + let act_a = &acts_a[next_rng() % acts_a.len()]; + let act_b = &acts_b[next_rng() % acts_b.len()]; + + let sample_a = (type_a.to_string(), act_a.0.clone(), act_a.1.clone()); + let sample_b = (type_b.to_string(), act_b.0.clone(), act_b.1.clone()); + + let result = llm_compare(&sample_a, &sample_b, model); + + let rating_a = EloRating { rating: ratings[type_a] }; + let rating_b = EloRating { rating: ratings[type_b] }; + + let outcome = match result { + Ok(std::cmp::Ordering::Less) => Outcomes::WIN, // A wins + Ok(std::cmp::Ordering::Greater) => Outcomes::LOSS, // B wins + _ => Outcomes::WIN, // default to A + }; + + let (new_a, new_b) = elo(&rating_a, &rating_b, &outcome, &config); + ratings.insert(type_a.to_string(), new_a.rating); + ratings.insert(type_b.to_string(), new_b.rating); + + eprint!(" matchup {}/{}: {} vs {} → {}\r", + i + 1, matchups, type_a, type_b, + if matches!(outcome, Outcomes::WIN) { type_a } else { type_b }); + } + eprintln!(); + + // Save updated ratings + save_elo_ratings(&ratings); + + // Print rankings + let mut ranked: Vec<_> = ratings.iter().collect(); + ranked.sort_by(|a, b| b.1.total_cmp(a.1)); + + println!("\nAgent Elo Ratings (after {} matchups):\n", matchups); + for (agent_type, rating) in &ranked { + let bar_len = ((*rating - 800.0) / 10.0).max(0.0) as usize; + let bar = "#".repeat(bar_len.min(40)); + println!(" {:12} {:7.1} {}", agent_type, rating, bar); + } + + Ok(()) +} + +fn build_compare_prompt( + a: &(String, String, String), + b: &(String, String, String), +) -> String { + if a.0 == b.0 { + // Same agent type — show instructions once + // Split context at "## Report output" to extract shared prompt + let split_a: Vec<&str> = a.2.splitn(2, "## Report output").collect(); + let split_b: Vec<&str> = b.2.splitn(2, "## Report output").collect(); + let shared_prompt = split_a.first().unwrap_or(&""); + let report_a = split_a.get(1).unwrap_or(&""); + let report_b = split_b.get(1).unwrap_or(&""); + format!( + "Compare two actions from the same {} agent. Which was better?\n\n\ + {}\n\n\ + ## Action A\n## Report output{}\n\n\ + ## Action B\n## Report output{}\n\n\ + Say which is better and why in 1-2 sentences, then end with:\n\ + BETTER: A or BETTER: B\n\ + You must pick one. No ties.", + a.0, shared_prompt, report_a, report_b + ) + } else { + format!( + "Compare these two memory graph agent actions. Which one was better \ + for building a useful, well-organized knowledge graph?\n\n\ + ## Action A ({} agent)\n{}\n\n\ + ## Action B ({} agent)\n{}\n\n\ + Say which is better and why in 1-2 sentences, then end with:\n\ + BETTER: A or BETTER: B\n\ + You must pick one. No ties.", + a.0, a.2, b.0, b.2 + ) + } +} + +fn llm_compare( + a: &(String, String, String), + b: &(String, String, String), + model: &str, +) -> Result { + let prompt = build_compare_prompt(a, b); + + let _ = model; // model selection handled by API backend config + let response = crate::agent::oneshot::call_api_with_tools_sync( + "compare", &[prompt], &[], None, 10, &[], None)?; + let response = response.trim().to_uppercase(); + + if response.contains("BETTER: B") { + Ok(std::cmp::Ordering::Greater) + } else { + // Default to A (includes "BETTER: A" and any unparseable response) + Ok(std::cmp::Ordering::Less) + } +} + diff --git a/src/cli/graph.rs b/src/cli/graph.rs index 3b2e3d9..b33d2ad 100644 --- a/src/cli/graph.rs +++ b/src/cli/graph.rs @@ -202,6 +202,17 @@ pub fn cmd_link_impact(source: &str, target: &str) -> Result<(), String> { Ok(()) } +pub fn cmd_link_audit(apply: bool) -> Result<(), String> { + let mut store = store::Store::load()?; + let stats = crate::audit::link_audit(&mut store, apply)?; + println!("\n{}", "=".repeat(60)); + println!("Link audit complete:"); + println!(" Kept: {} Deleted: {} Retargeted: {} Weakened: {} Strengthened: {} Errors: {}", + stats.kept, stats.deleted, stats.retargeted, stats.weakened, stats.strengthened, stats.errors); + println!("{}", "=".repeat(60)); + Ok(()) +} + pub fn cmd_trace(key: &[String]) -> Result<(), String> { if key.is_empty() { return Err("trace requires a key".into()); diff --git a/src/cli/journal.rs b/src/cli/journal.rs index 67da9e3..b0a4fd9 100644 --- a/src/cli/journal.rs +++ b/src/cli/journal.rs @@ -90,21 +90,48 @@ pub fn find_current_transcript() -> Option { newest.map(|(_, p)| p.to_string_lossy().to_string()) } -fn journal_tail_query(store: &crate::store::Store, query: &str, n: usize, full: bool) -> Result<(), String> { - let graph = store.build_graph(); - let stages = crate::query_parser::parse_stages(query)?; - let results = crate::search::run_query(&stages, vec![], &graph, store, false, n); +fn journal_tail_entries(store: &crate::store::Store, n: usize, full: bool) -> Result<(), String> { + let date_re = regex::Regex::new(r"(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2})").unwrap(); + let key_date_re = regex::Regex::new(r"j-(\d{4}-\d{2}-\d{2}[t-]\d{2}-\d{2})").unwrap(); - // Query sorts desc and limits, so reverse to show oldest-to-newest - for (key, _score) in results.into_iter().rev() { - let Some(node) = store.nodes.get(&key) else { continue }; - let ts = if node.created_at > 0 { - crate::store::format_datetime(node.created_at) - } else if node.timestamp > 0 { - crate::store::format_datetime(node.timestamp) + let normalize_date = |s: &str| -> String { + let s = s.replace('t', "T"); + if s.len() >= 16 { + format!("{}T{}", &s[..10], s[11..].replace('-', ":")) } else { - node.key.clone() - }; + s + } + }; + + let extract_sort = |node: &crate::store::Node| -> (i64, String) { + if node.created_at > 0 { + return (node.created_at, crate::store::format_datetime(node.created_at)); + } + if let Some(caps) = key_date_re.captures(&node.key) { + return (0, normalize_date(&caps[1])); + } + if let Some(caps) = date_re.captures(&node.content) { + return (0, normalize_date(&caps[1])); + } + (node.timestamp, crate::store::format_datetime(node.timestamp)) + }; + + let mut journal: Vec<_> = store.nodes.values() + .filter(|node| node.node_type == crate::store::NodeType::EpisodicSession) + .collect(); + journal.sort_by(|a, b| { + let (at, as_) = extract_sort(a); + let (bt, bs) = extract_sort(b); + if at > 0 && bt > 0 { + at.cmp(&bt) + } else { + as_.cmp(&bs) + } + }); + + let skip = if journal.len() > n { journal.len() - n } else { 0 }; + for node in journal.iter().skip(skip) { + let (_, ts) = extract_sort(node); let title = extract_title(&node.content); if full { println!("--- [{}] {} ---\n{}\n", ts, title, node.content); @@ -115,14 +142,44 @@ fn journal_tail_query(store: &crate::store::Store, query: &str, n: usize, full: Ok(()) } +fn journal_tail_digests(store: &crate::store::Store, node_type: crate::store::NodeType, n: usize, full: bool) -> Result<(), String> { + let mut digests: Vec<_> = store.nodes.values() + .filter(|node| node.node_type == node_type) + .collect(); + digests.sort_by(|a, b| { + if a.timestamp > 0 && b.timestamp > 0 { + a.timestamp.cmp(&b.timestamp) + } else { + a.key.cmp(&b.key) + } + }); + + let skip = if digests.len() > n { digests.len() - n } else { 0 }; + for node in digests.iter().skip(skip) { + let label = &node.key; + let title = extract_title(&node.content); + if full { + println!("--- [{}] {} ---\n{}\n", label, title, node.content); + } else { + println!("[{}] {}", label, title); + } + } + Ok(()) +} + pub fn cmd_journal_tail(n: usize, full: bool, level: u8) -> Result<(), String> { let store = crate::store::Store::load()?; - let query = format!("all | type:{} | sort:timestamp | limit:{}", - match level { 0 => "episodic", 1 => "daily", 2 => "weekly", _ => "monthly" }, - n - ); - journal_tail_query(&store, &query, n, full) + if level == 0 { + journal_tail_entries(&store, n, full) + } else { + let node_type = match level { + 1 => crate::store::NodeType::EpisodicDaily, + 2 => crate::store::NodeType::EpisodicWeekly, + _ => crate::store::NodeType::EpisodicMonthly, + }; + journal_tail_digests(&store, node_type, n, full) + } } pub fn cmd_journal_write(name: &str, text: &[String]) -> Result<(), String> { diff --git a/src/hippocampus/cursor.rs b/src/hippocampus/cursor.rs new file mode 100644 index 0000000..6cbeecd --- /dev/null +++ b/src/hippocampus/cursor.rs @@ -0,0 +1,315 @@ +// Spatial memory cursor — a persistent pointer into the knowledge graph. +// +// The cursor maintains a "you are here" position that persists across +// sessions. Navigation moves through three dimensions: +// - Temporal: forward/back among same-type nodes by timestamp +// - Hierarchical: up/down the digest tree (journal→daily→weekly→monthly) +// - Spatial: sideways along graph edges to linked nodes +// +// This is the beginning of place cells — the hippocampus doesn't just +// store, it maintains a map. The cursor is the map's current position. + +use crate::store::{self, Node, Store}; + +use std::path::PathBuf; + +fn cursor_path() -> PathBuf { + store::memory_dir().join("cursor") +} + +/// Read the current cursor position (node key), if any. +pub fn get() -> Option { + std::fs::read_to_string(cursor_path()) + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) +} + +/// Set the cursor to a node key. +pub fn set(key: &str) -> Result<(), String> { + std::fs::write(cursor_path(), format!("{}\n", key)) + .map_err(|e| format!("write cursor: {}", e)) +} + +/// Clear the cursor. +pub fn clear() -> Result<(), String> { + let p = cursor_path(); + if p.exists() { + std::fs::remove_file(&p) + .map_err(|e| format!("clear cursor: {}", e))?; + } + Ok(()) +} + +/// Temporal neighbors: nodes of the same type, sorted by timestamp. +/// Returns (prev, next) keys relative to the given node. +pub(crate) fn temporal_neighbors(store: &Store, key: &str) -> (Option, Option) { + let Some(node) = store.nodes.get(key) else { return (None, None) }; + let node_type = node.node_type; + + let mut same_type: Vec<(&str, i64)> = store.nodes.iter() + .filter(|(_, n)| !n.deleted && n.node_type == node_type && n.timestamp > 0) + .map(|(k, n)| (k.as_str(), n.timestamp)) + .collect(); + same_type.sort_by_key(|(_, t)| *t); + + let pos = same_type.iter().position(|(k, _)| *k == key); + let prev = pos.and_then(|i| if i > 0 { Some(same_type[i - 1].0.to_string()) } else { None }); + let next = pos.and_then(|i| same_type.get(i + 1).map(|(k, _)| k.to_string())); + + (prev, next) +} + +/// Digest hierarchy: find the parent digest for a node. +/// Journal → daily, daily → weekly, weekly → monthly. +pub(crate) fn digest_parent(store: &Store, key: &str) -> Option { + let node = store.nodes.get(key)?; + + let parent_type = match node.node_type { + store::NodeType::EpisodicSession => store::NodeType::EpisodicDaily, + store::NodeType::EpisodicDaily => store::NodeType::EpisodicWeekly, + store::NodeType::EpisodicWeekly => store::NodeType::EpisodicMonthly, + _ => return None, + }; + + // Look for structural links first (digest:structural provenance) + for r in &store.relations { + if r.deleted { continue; } + if r.source_key == key + && let Some(target) = store.nodes.get(&r.target_key) + && target.node_type == parent_type { + return Some(r.target_key.clone()); + } + } + + // Fallback: match by date for journal→daily + if node.node_type == store::NodeType::EpisodicSession { + // Try extracting date from timestamp first, then from key + let mut dates = Vec::new(); + if node.timestamp > 0 { + dates.push(store::format_date(node.timestamp)); + } + // Extract date from created_at timestamp + if node.created_at > 0 { + let created_date = store::format_date(node.created_at); + if !dates.contains(&created_date) { + dates.push(created_date); + } + } + for date in &dates { + for prefix in [&format!("daily-{}", date), &format!("digest#daily#{}", date)] { + for (k, n) in &store.nodes { + if !n.deleted && n.node_type == parent_type && k.starts_with(prefix.as_str()) { + return Some(k.clone()); + } + } + } + } + } + + None +} + +/// Digest children: find nodes that feed into this digest. +/// Monthly → weeklies, weekly → dailies, daily → journal entries. +pub(crate) fn digest_children(store: &Store, key: &str) -> Vec { + let Some(node) = store.nodes.get(key) else { return vec![] }; + + let child_type = match node.node_type { + store::NodeType::EpisodicDaily => store::NodeType::EpisodicSession, + store::NodeType::EpisodicWeekly => store::NodeType::EpisodicDaily, + store::NodeType::EpisodicMonthly => store::NodeType::EpisodicWeekly, + _ => return vec![], + }; + + // Look for structural links (source → this digest) + let mut children: Vec<(String, i64)> = Vec::new(); + for r in &store.relations { + if r.deleted { continue; } + if r.target_key == key + && let Some(source) = store.nodes.get(&r.source_key) + && source.node_type == child_type { + children.push((r.source_key.clone(), source.timestamp)); + } + } + + // Fallback for daily → journal: extract date from key and match + if children.is_empty() && node.node_type == store::NodeType::EpisodicDaily { + // Extract date from keys like "daily-2026-03-13" or "daily-2026-03-13-suffix" + let date = key.strip_prefix("daily-") + .or_else(|| key.strip_prefix("digest#daily#")) + .and_then(|rest| rest.get(..10)); // "YYYY-MM-DD" + if let Some(date) = date { + for (k, n) in &store.nodes { + if n.deleted { continue; } + if n.node_type == store::NodeType::EpisodicSession + && n.timestamp > 0 + && store::format_date(n.timestamp) == date + { + children.push((k.clone(), n.timestamp)); + } + } + } + } + + children.sort_by_key(|(_, t)| *t); + children.into_iter().map(|(k, _)| k).collect() +} + +/// Graph neighbors sorted by edge strength. +pub(crate) fn graph_neighbors(store: &Store, key: &str) -> Vec<(String, f32)> { + let mut neighbors: Vec<(String, f32)> = Vec::new(); + for r in &store.relations { + if r.deleted { continue; } + if r.source_key == key { + neighbors.push((r.target_key.clone(), r.strength)); + } else if r.target_key == key { + neighbors.push((r.source_key.clone(), r.strength)); + } + } + neighbors.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + neighbors.dedup_by(|a, b| a.0 == b.0); + neighbors +} + +/// Format a one-line summary of a node for context display. +fn node_summary(node: &Node) -> String { + let ts = if node.timestamp > 0 { + store::format_datetime(node.timestamp) + } else { + "no-date".to_string() + }; + let type_tag = match node.node_type { + store::NodeType::EpisodicSession => "journal", + store::NodeType::EpisodicDaily => "daily", + store::NodeType::EpisodicWeekly => "weekly", + store::NodeType::EpisodicMonthly => "monthly", + store::NodeType::Semantic => "semantic", + }; + // First line of content, truncated + let first_line = node.content.lines().next().unwrap_or("") + .chars().take(80).collect::(); + format!("[{}] ({}) {}", ts, type_tag, first_line) +} + +/// Display the cursor position with full context. +pub fn show(store: &Store) -> Result<(), String> { + let key = get().ok_or_else(|| "No cursor set. Use `poc-memory cursor set KEY`".to_string())?; + let node = store.nodes.get(&key) + .ok_or_else(|| format!("Cursor points to missing node: {}", key))?; + + // Header + let type_tag = match node.node_type { + store::NodeType::EpisodicSession => "journal", + store::NodeType::EpisodicDaily => "daily", + store::NodeType::EpisodicWeekly => "weekly", + store::NodeType::EpisodicMonthly => "monthly", + store::NodeType::Semantic => "semantic", + }; + if node.timestamp > 0 { + eprintln!("@ {} [{}]", key, type_tag); + eprintln!(" {}", store::format_datetime(node.timestamp)); + } else { + eprintln!("@ {} [{}]", key, type_tag); + } + + // Temporal context + let (prev, next) = temporal_neighbors(store, &key); + eprintln!(); + if let Some(ref p) = prev + && let Some(pn) = store.nodes.get(p) { + eprintln!(" ← {}", node_summary(pn)); + eprintln!(" `cursor back`"); + } + if let Some(ref n) = next + && let Some(nn) = store.nodes.get(n) { + eprintln!(" → {}", node_summary(nn)); + eprintln!(" `cursor forward`"); + } + + // Hierarchy + if let Some(ref parent) = digest_parent(store, &key) + && let Some(pn) = store.nodes.get(parent) { + eprintln!(" ↑ {}", node_summary(pn)); + eprintln!(" `cursor up`"); + } + let children = digest_children(store, &key); + if !children.is_empty() { + let count = children.len(); + if let Some(first) = children.first().and_then(|k| store.nodes.get(k)) { + eprintln!(" ↓ {} children — first: {}", count, node_summary(first)); + eprintln!(" `cursor down`"); + } + } + + // Graph neighbors (non-temporal) + let neighbors = graph_neighbors(store, &key); + let semantic: Vec<_> = neighbors.iter() + .filter(|(k, _)| { + store.nodes.get(k) + .map(|n| n.node_type == store::NodeType::Semantic) + .unwrap_or(false) + }) + .take(8) + .collect(); + if !semantic.is_empty() { + eprintln!(); + eprintln!(" Linked:"); + for (k, strength) in &semantic { + eprintln!(" [{:.1}] {}", strength, k); + } + } + + eprintln!(); + eprintln!("---"); + + // Content + print!("{}", node.content); + + Ok(()) +} + +/// Move cursor in a temporal direction. +pub fn move_temporal(store: &Store, forward: bool) -> Result<(), String> { + let key = get().ok_or("No cursor set")?; + let _ = store.nodes.get(&key) + .ok_or_else(|| format!("Cursor points to missing node: {}", key))?; + + let (prev, next) = temporal_neighbors(store, &key); + let target = if forward { next } else { prev }; + match target { + Some(k) => { + set(&k)?; + show(store) + } + None => { + let dir = if forward { "forward" } else { "back" }; + Err(format!("No {} neighbor from {}", dir, key)) + } + } +} + +/// Move cursor up the digest hierarchy. +pub fn move_up(store: &Store) -> Result<(), String> { + let key = get().ok_or("No cursor set")?; + match digest_parent(store, &key) { + Some(parent) => { + set(&parent)?; + show(store) + } + None => Err(format!("No parent digest for {}", key)), + } +} + +/// Move cursor down the digest hierarchy (to first child). +pub fn move_down(store: &Store) -> Result<(), String> { + let key = get().ok_or("No cursor set")?; + let children = digest_children(store, &key); + match children.first() { + Some(child) => { + set(child)?; + show(store) + } + None => Err(format!("No children for {}", key)), + } +} diff --git a/src/hippocampus/mod.rs b/src/hippocampus/mod.rs index 39749ba..9e1300a 100644 --- a/src/hippocampus/mod.rs +++ b/src/hippocampus/mod.rs @@ -9,6 +9,7 @@ pub mod memory; pub mod store; pub mod graph; pub mod lookups; +pub mod cursor; pub mod query; pub mod spectral; pub mod neuro; diff --git a/src/lib.rs b/src/lib.rs index 06acbf6..b2aaab5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,9 +70,15 @@ pub mod channel_capnp { // Re-exports — all existing crate::X paths keep working pub use hippocampus::{ - store, graph, lookups, query, + store, graph, lookups, cursor, query, spectral, neuro, counters, transcript, memory, }; -use hippocampus::query::engine as search; -use hippocampus::query::parser as query_parser; +pub use hippocampus::query::engine as search; +pub use hippocampus::query::parser as query_parser; + +pub use subconscious as agents; +pub use subconscious::{ + audit, consolidate, + digest, +}; diff --git a/src/main.rs b/src/main.rs index b528ec5..d688f13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -203,6 +203,12 @@ EXAMPLES: #[command(subcommand, name = "graph")] GraphCmd(GraphCmd), + // ── Cursor (spatial memory) ────────────────────────────────────── + + /// Navigate the memory graph with a persistent cursor + #[command(subcommand)] + Cursor(CursorCmd), + // ── Agents ──────────────────────────────────────────────────────── /// Agent and daemon operations @@ -243,6 +249,27 @@ enum NodeCmd { Dump, } +#[derive(Subcommand)] +enum CursorCmd { + /// Show current cursor position with context + Show, + /// Set cursor to a node key + Set { + /// Node key + key: Vec, + }, + /// Move cursor forward in time + Forward, + /// Move cursor backward in time + Back, + /// Move up the digest hierarchy (journal→daily→weekly→monthly) + Up, + /// Move down the digest hierarchy (to first child) + Down, + /// Clear the cursor + Clear, +} + #[derive(Subcommand)] enum JournalCmd { /// Write a journal entry to the store @@ -264,6 +291,16 @@ enum JournalCmd { #[arg(long, default_value_t = 0)] level: u8, }, + /// Enrich journal entry with conversation links + Enrich { + /// Path to JSONL transcript + jsonl_path: String, + /// Journal entry text to enrich + entry_text: String, + /// Grep line number for source location + #[arg(default_value_t = 0)] + grep_line: usize, + }, } #[derive(Subcommand)] @@ -309,6 +346,13 @@ enum GraphCmd { /// Target node key target: String, }, + /// Walk every link, send to Sonnet for quality review + #[command(name = "link-audit")] + LinkAudit { + /// Apply changes (default: dry run) + #[arg(long)] + apply: bool, + }, /// Cap node degree by pruning weak auto edges #[command(name = "cap-degree")] CapDegree { @@ -357,6 +401,64 @@ enum GraphCmd { #[derive(Subcommand)] enum AgentCmd { + /// Run knowledge agents to convergence + #[command(name = "knowledge-loop")] + KnowledgeLoop { + /// Maximum cycles before stopping + #[arg(long, default_value_t = 20)] + max_cycles: usize, + /// Items per agent per cycle + #[arg(long, default_value_t = 5)] + batch_size: usize, + /// Cycles to check for convergence + #[arg(long, default_value_t = 5)] + window: usize, + /// Maximum inference depth + #[arg(long, default_value_t = 4)] + max_depth: i32, + }, + /// Run agent consolidation on priority nodes + #[command(name = "consolidate-batch")] + ConsolidateBatch { + /// Number of nodes to consolidate + #[arg(long, default_value_t = 5)] + count: usize, + /// Generate replay agent prompt automatically + #[arg(long)] + auto: bool, + /// Generate prompt for a specific agent (replay, linker, separator, transfer, health) + #[arg(long)] + agent: Option, + }, + /// Analyze metrics, plan agent allocation + #[command(name = "consolidate-session")] + ConsolidateSession, + /// Autonomous: plan → agents → apply → digests → links + #[command(name = "consolidate-full")] + ConsolidateFull, + /// Import pending agent results into the graph + #[command(name = "apply-agent")] + ApplyAgent { + /// Process all files without moving to done/ + #[arg(long)] + all: bool, + }, + /// Extract and apply actions from consolidation reports + #[command(name = "apply-consolidation")] + ApplyConsolidation { + /// Apply actions (default: dry run) + #[arg(long)] + apply: bool, + /// Read from specific report file + #[arg(long)] + report: Option, + }, + /// Generate episodic digests (daily, weekly, monthly, auto) + Digest { + /// Digest type: daily, weekly, monthly, auto + #[command(subcommand)] + level: DigestLevel, + }, /// Parse and apply links from digest nodes #[command(name = "digest-links")] DigestLinks { @@ -364,6 +466,36 @@ enum AgentCmd { #[arg(long)] apply: bool, }, + /// Mine conversation for experiential moments to journal + #[command(name = "experience-mine")] + ExperienceMine { + /// Path to JSONL transcript (default: most recent) + jsonl_path: Option, + }, + /// Extract atomic facts from conversation transcripts + #[command(name = "fact-mine")] + FactMine { + /// Path to JSONL transcript or directory (with --batch) + path: String, + /// Process all .jsonl files in directory + #[arg(long)] + batch: bool, + /// Show chunks without calling model + #[arg(long)] + dry_run: bool, + /// Write JSON to file (default: stdout) + #[arg(long, short)] + output: Option, + /// Skip transcripts with fewer messages + #[arg(long, default_value_t = 10)] + min_messages: usize, + }, + /// Extract facts from a transcript and store directly + #[command(name = "fact-mine-store")] + FactMineStore { + /// Path to JSONL transcript + path: String, + }, /// Run a single agent by name Run { /// Agent name (e.g. observation, linker, distill) @@ -394,6 +526,19 @@ enum AgentCmd { #[arg(long, default_value_t = 10)] count: usize, }, + /// Evaluate agent quality by LLM-sorted ranking + #[command(name = "evaluate")] + Evaluate { + /// Number of pairwise matchups to run + #[arg(long, default_value_t = 30)] + matchups: usize, + /// Model to use for comparison (haiku or sonnet) + #[arg(long, default_value = "haiku")] + model: String, + /// Show example comparison prompt without calling LLM + #[arg(long)] + dry_run: bool, + }, } #[derive(Subcommand)] @@ -464,6 +609,27 @@ enum AdminCmd { MigrateTranscriptProgress, } +#[derive(Subcommand)] +enum DigestLevel { + /// Generate daily digest + Daily { + /// Date (default: today) + date: Option, + }, + /// Generate weekly digest + Weekly { + /// Date or week label (default: current week) + date: Option, + }, + /// Generate monthly digest + Monthly { + /// Month (YYYY-MM) or date (default: current month) + date: Option, + }, + /// Generate all missing digests + Auto, +} + /// Print help with subcommands expanded to show nested commands. fn print_help() { use clap::CommandFactory; @@ -530,6 +696,7 @@ impl Run for Command { Self::Node(sub) => sub.run(), Self::Journal(sub) => sub.run(), Self::GraphCmd(sub) => sub.run(), + Self::Cursor(sub) => sub.run(), Self::Agent(sub) => sub.run(), Self::Admin(sub) => sub.run(), // mcp-schema moved to consciousness-mcp binary @@ -554,6 +721,8 @@ impl Run for JournalCmd { match self { Self::Write { name, text } => cli::journal::cmd_journal_write(&name, &text), Self::Tail { n, full, level } => cli::journal::cmd_journal_tail(n, full, level), + Self::Enrich { jsonl_path, entry_text, grep_line } + => cli::agent::cmd_journal_enrich(&jsonl_path, &entry_text, grep_line), } } } @@ -568,6 +737,7 @@ impl Run for GraphCmd { Self::LinkSet { source, target, strength } => cli::graph::cmd_link_set(&source, &target, strength), Self::LinkImpact { source, target } => cli::graph::cmd_link_impact(&source, &target), + Self::LinkAudit { apply } => cli::graph::cmd_link_audit(apply), Self::CapDegree { max_degree } => cli::graph::cmd_cap_degree(max_degree), Self::NormalizeStrengths { apply } => cli::graph::cmd_normalize_strengths(apply), Self::Trace { key } => cli::graph::cmd_trace(&key), @@ -579,13 +749,57 @@ impl Run for GraphCmd { } } +impl Run for CursorCmd { + fn run(self) -> Result<(), String> { + match self { + Self::Show => { + let store = store::Store::load()?; + cursor::show(&store) + } + Self::Set { key } => { + if key.is_empty() { return Err("cursor set requires a key".into()); } + let key = key.join(" "); + let store = store::Store::load()?; + let bare = store::strip_md_suffix(&key); + if !store.nodes.contains_key(&bare) { + return Err(format!("Node not found: {}", bare)); + } + cursor::set(&bare)?; + cursor::show(&store) + } + Self::Forward => { let s = store::Store::load()?; cursor::move_temporal(&s, true) } + Self::Back => { let s = store::Store::load()?; cursor::move_temporal(&s, false) } + Self::Up => { let s = store::Store::load()?; cursor::move_up(&s) } + Self::Down => { let s = store::Store::load()?; cursor::move_down(&s) } + Self::Clear => cursor::clear(), + } + } +} + impl Run for AgentCmd { fn run(self) -> Result<(), String> { match self { + Self::KnowledgeLoop { max_cycles, batch_size, window, max_depth } + => cli::agent::cmd_knowledge_loop(max_cycles, batch_size, window, max_depth), + Self::ConsolidateBatch { count, auto, agent } + => cli::agent::cmd_consolidate_batch(count, auto, agent), + Self::ConsolidateSession => cli::agent::cmd_consolidate_session(), + Self::ConsolidateFull => cli::agent::cmd_consolidate_full(), + Self::ApplyAgent { all } => cmd_apply_agent(all), + Self::ApplyConsolidation { apply, report } + => cli::agent::cmd_apply_consolidation(apply, report.as_deref()), + Self::Digest { level } => cmd_digest(level), Self::DigestLinks { apply } => cli::agent::cmd_digest_links(apply), + Self::ExperienceMine { .. } + => Err("experience-mine has been removed — use the observation agent instead.".into()), + Self::FactMine { path, batch, dry_run, output, min_messages } + => cli::agent::cmd_fact_mine(&path, batch, dry_run, output.as_deref(), min_messages), + Self::FactMineStore { path } => cli::agent::cmd_fact_mine_store(&path), Self::Run { agent, count, target, query, dry_run, local, state_dir } => cli::agent::cmd_run_agent(&agent, count, &target, query.as_deref(), dry_run, local, state_dir.as_deref()), Self::ReplayQueue { count } => cli::agent::cmd_replay_queue(count), + Self::Evaluate { matchups, model, dry_run } + => cli::agent::cmd_evaluate_agents(matchups, &model, dry_run), } } } @@ -639,3 +853,166 @@ fn main() { } } +// ── Command implementations ───────────────────────────────────────── + +/// Apply links from a single agent result JSON file. +/// Returns (links_applied, errors). +fn apply_agent_file( + store: &mut store::Store, + data: &serde_json::Value, +) -> (usize, usize) { + let agent_result = data.get("agent_result").or(Some(data)); + let links = match agent_result.and_then(|r| r.get("links")).and_then(|l| l.as_array()) { + Some(l) => l, + None => return (0, 0), + }; + + let entry_text = data.get("entry_text") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + if let (Some(start), Some(end)) = ( + agent_result.and_then(|r| r.get("source_start")).and_then(|v| v.as_u64()), + agent_result.and_then(|r| r.get("source_end")).and_then(|v| v.as_u64()), + ) { + println!(" Source: L{}-L{}", start, end); + } + + let mut applied = 0; + let mut errors = 0; + + for link in links { + let target = match link.get("target").and_then(|v| v.as_str()) { + Some(t) => t, + None => continue, + }; + let reason = link.get("reason").and_then(|v| v.as_str()).unwrap_or(""); + + if let Some(note) = target.strip_prefix("NOTE:") { + println!(" NOTE: {} — {}", note, reason); + continue; + } + + let resolved = match store.resolve_key(target) { + Ok(r) => r, + Err(_) => { + println!(" SKIP {} (not found in graph)", target); + continue; + } + }; + + let source_key = match store.find_journal_node(entry_text) { + Some(k) => k, + None => { + println!(" SKIP {} (no matching journal node)", target); + continue; + } + }; + + let source_uuid = match store.nodes.get(&source_key) { + Some(n) => n.uuid, + None => continue, + }; + let target_uuid = match store.nodes.get(&resolved) { + Some(n) => n.uuid, + None => continue, + }; + + let rel = store::new_relation( + source_uuid, target_uuid, + store::RelationType::Link, + 0.5, + &source_key, &resolved, + ); + if let Err(e) = store.add_relation(rel) { + eprintln!(" Error adding relation: {}", e); + errors += 1; + } else { + println!(" LINK {} → {} ({})", source_key, resolved, reason); + applied += 1; + } + } + + (applied, errors) +} + +fn cmd_apply_agent(process_all: bool) -> Result<(), String> { + let results_dir = store::memory_dir().join("agent-results"); + + if !results_dir.exists() { + println!("No agent results directory"); + return Ok(()); + } + + let mut store = store::Store::load()?; + let mut applied = 0; + let mut errors = 0; + + let mut files: Vec<_> = std::fs::read_dir(&results_dir) + .map_err(|e| format!("read results dir: {}", e))? + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension().map(|x| x == "json").unwrap_or(false)) + .collect(); + files.sort_by_key(|e| e.path()); + + for entry in &files { + let path = entry.path(); + let content = match std::fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + eprintln!(" Skip {}: {}", path.display(), e); + errors += 1; + continue; + } + }; + + let data: serde_json::Value = match serde_json::from_str(&content) { + Ok(d) => d, + Err(e) => { + eprintln!(" Skip {}: parse error: {}", path.display(), e); + errors += 1; + continue; + } + }; + + println!("Processing {}:", path.file_name().unwrap().to_string_lossy()); + let (a, e) = apply_agent_file(&mut store, &data); + applied += a; + errors += e; + + if !process_all { + let done_dir = crate::util::memory_subdir("agent-results/done")?; + let dest = done_dir.join(path.file_name().unwrap()); + std::fs::rename(&path, &dest).ok(); + } + } + + if applied > 0 { + store.save()?; + } + + println!("\nApplied {} links ({} errors, {} files processed)", + applied, errors, files.len()); + Ok(()) +} + +fn cmd_digest(level: DigestLevel) -> Result<(), String> { + let mut store = store::Store::load()?; + + match level { + DigestLevel::Auto => digest::digest_auto(&mut store), + DigestLevel::Daily { date } => { + let arg = date.unwrap_or_else(|| store::format_date(store::now_epoch())); + digest::generate(&mut store, "daily", &arg) + } + DigestLevel::Weekly { date } => { + let arg = date.unwrap_or_else(|| store::format_date(store::now_epoch())); + digest::generate(&mut store, "weekly", &arg) + } + DigestLevel::Monthly { date } => { + let arg = date.unwrap_or_else(|| store::format_date(store::now_epoch())); + digest::generate(&mut store, "monthly", &arg) + } + } +} + diff --git a/src/mind/subconscious.rs b/src/mind/subconscious.rs index b61d03a..fead155 100644 --- a/src/mind/subconscious.rs +++ b/src/mind/subconscious.rs @@ -319,7 +319,7 @@ struct SubconsciousAgent { forked_agent: Option>, /// Entry index where the fork diverged from the conscious agent. fork_point: usize, - handle: Option)>>, + handle: Option)>>, } impl SubconsciousAgent { diff --git a/src/mind/unconscious.rs b/src/mind/unconscious.rs index a26e6ee..bb6781c 100644 --- a/src/mind/unconscious.rs +++ b/src/mind/unconscious.rs @@ -34,7 +34,7 @@ struct UnconsciousAgent { name: String, enabled: bool, auto: AutoAgent, - handle: Option)>>, + handle: Option)>>, /// Shared agent handle — UI locks to read context live. pub agent: Option>, last_run: Option, diff --git a/src/subconscious/agents/digest.agent b/src/subconscious/agents/digest.agent index efe4bc6..c342d25 100644 --- a/src/subconscious/agents/digest.agent +++ b/src/subconscious/agents/digest.agent @@ -23,14 +23,8 @@ summaries into weekly ones. 2. Read the undigested entries with `journal_tail` (level 0, after the last digest date). -3. Check if the most recent digest at this level should be updated - (same date/week/month) — if so, use `journal_update` with the - appropriate level to append to it. - -4. If starting a new period, use `journal_new` with the level: - - level=1 for daily digests - - level=2 for weekly digests - - level=3 for monthly digests +3. Write the digest with `memory_write` and link source entries + to it with `memory_link_add`. ## Writing style diff --git a/src/subconscious/audit.rs b/src/subconscious/audit.rs new file mode 100644 index 0000000..7e297fa --- /dev/null +++ b/src/subconscious/audit.rs @@ -0,0 +1,333 @@ +// Link audit: walk every link in the graph, batch to Sonnet for quality review. +// +// Each batch of links gets reviewed by Sonnet, which returns per-link actions: +// KEEP, DELETE, RETARGET, WEAKEN, STRENGTHEN. Batches run in parallel via rayon. + +use crate::store::{self, Store, new_relation}; + +use std::collections::HashSet; + +struct LinkInfo { + rel_idx: usize, + source_key: String, + target_key: String, + source_content: String, + target_content: String, + strength: f32, + target_sections: Vec, +} + +pub struct AuditStats { + pub kept: usize, + pub deleted: usize, + pub retargeted: usize, + pub weakened: usize, + pub strengthened: usize, + pub errors: usize, +} + +fn build_audit_prompt(batch: &[LinkInfo], batch_num: usize, total_batches: usize) -> String { + let mut prompt = format!( + "You are auditing memory graph links for quality (batch {}/{}).\n\n\ + For each numbered link, decide what to do:\n\n\ + KEEP N — link is meaningful, leave it\n\ + DELETE N — link is noise, accidental, or too generic to be useful\n\ + RETARGET N new_key — link points to the right topic area but wrong node;\n\ + \x20 retarget to a more specific section (listed under each link)\n\ + WEAKEN N strength — link is marginal; reduce strength (0.1-0.3)\n\ + STRENGTHEN N strength — link is important but underweighted; increase (0.8-1.0)\n\n\ + Output exactly one action per link number, nothing else.\n\n\ + Links to review:\n\n", + batch_num, total_batches); + + for (i, link) in batch.iter().enumerate() { + let n = i + 1; + prompt.push_str(&format!( + "--- Link {} ---\n\ + {} → {} (strength={:.2})\n\n\ + Source content:\n{}\n\n\ + Target content:\n{}\n", + n, link.source_key, link.target_key, link.strength, + &link.source_content, &link.target_content)); + + if !link.target_sections.is_empty() { + prompt.push_str( + "\nTarget has sections (consider RETARGET to a more specific one):\n"); + for s in &link.target_sections { + prompt.push_str(&format!(" - {}\n", s)); + } + } + prompt.push('\n'); + } + + prompt +} + +fn parse_audit_response(response: &str, batch_size: usize) -> Vec<(usize, AuditAction)> { + let mut actions = Vec::new(); + + for line in response.lines() { + let line = line.trim(); + if line.is_empty() { continue; } + + let parts: Vec<&str> = line.splitn(3, ' ').collect(); + if parts.len() < 2 { continue; } + + let action = parts[0].to_uppercase(); + let idx: usize = match parts[1].parse::() { + Ok(n) if n >= 1 && n <= batch_size => n - 1, + _ => continue, + }; + + let audit_action = match action.as_str() { + "KEEP" => AuditAction::Keep, + "DELETE" => AuditAction::Delete, + "RETARGET" => { + if parts.len() < 3 { continue; } + AuditAction::Retarget(parts[2].trim().to_string()) + } + "WEAKEN" => { + if parts.len() < 3 { continue; } + match parts[2].trim().parse::() { + Ok(s) => AuditAction::Weaken(s), + Err(_) => continue, + } + } + "STRENGTHEN" => { + if parts.len() < 3 { continue; } + match parts[2].trim().parse::() { + Ok(s) => AuditAction::Strengthen(s), + Err(_) => continue, + } + } + _ => continue, + }; + + actions.push((idx, audit_action)); + } + + actions +} + +enum AuditAction { + Keep, + Delete, + Retarget(String), + Weaken(f32), + Strengthen(f32), +} + +/// Run a full link audit: walk every link, batch to Sonnet, apply results. +pub fn link_audit(store: &mut Store, apply: bool) -> Result { + // Collect all non-deleted relations with their info + let mut links: Vec = Vec::new(); + + for (idx, rel) in store.relations.iter().enumerate() { + if rel.deleted { continue; } + + let source_content = store.nodes.get(&rel.source_key) + .map(|n| n.content.clone()).unwrap_or_default(); + let target_content = store.nodes.get(&rel.target_key) + .map(|n| n.content.clone()).unwrap_or_default(); + + // Find section children of target if it's file-level + let target_sections = if !rel.target_key.contains('#') { + let prefix = format!("{}#", rel.target_key); + store.nodes.keys() + .filter(|k| k.starts_with(&prefix)) + .cloned() + .collect() + } else { + Vec::new() + }; + + links.push(LinkInfo { + rel_idx: idx, + source_key: rel.source_key.clone(), + target_key: rel.target_key.clone(), + source_content, + target_content, + strength: rel.strength, + target_sections, + }); + } + + let total = links.len(); + println!("Link audit: {} links to review", total); + if !apply { + println!("DRY RUN — use --apply to make changes"); + } + + // Batch by char budget (~100K chars per prompt) + let char_budget = 100_000usize; + let mut batches: Vec> = Vec::new(); + let mut current_batch: Vec = Vec::new(); + let mut current_chars = 0usize; + + for (i, link) in links.iter().enumerate() { + let link_chars = link.source_content.len() + link.target_content.len() + 200; + if !current_batch.is_empty() && current_chars + link_chars > char_budget { + batches.push(std::mem::take(&mut current_batch)); + current_chars = 0; + } + current_batch.push(i); + current_chars += link_chars; + } + if !current_batch.is_empty() { + batches.push(current_batch); + } + + let total_batches = batches.len(); + println!("{} batches (avg {} links/batch)\n", total_batches, + if total_batches > 0 { total / total_batches } else { 0 }); + + use rayon::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + // Build all batch prompts up front + let batch_data: Vec<(usize, Vec, String)> = batches.iter().enumerate() + .map(|(batch_idx, batch_indices)| { + let batch_infos: Vec = batch_indices.iter().map(|&i| { + let l = &links[i]; + LinkInfo { + rel_idx: l.rel_idx, + source_key: l.source_key.clone(), + target_key: l.target_key.clone(), + source_content: l.source_content.clone(), + target_content: l.target_content.clone(), + strength: l.strength, + target_sections: l.target_sections.clone(), + } + }).collect(); + let prompt = build_audit_prompt(&batch_infos, batch_idx + 1, total_batches); + (batch_idx, batch_infos, prompt) + }) + .collect(); + + // Progress counter + let done = AtomicUsize::new(0); + + // Run batches in parallel via rayon + let batch_results: Vec<_> = batch_data.par_iter() + .map(|(batch_idx, batch_infos, prompt)| { + let response = crate::agent::oneshot::call_api_with_tools_sync( + "audit", &[prompt.clone()], &[], None, 10, &[], None); + let completed = done.fetch_add(1, Ordering::Relaxed) + 1; + eprint!("\r Batches: {}/{} done", completed, total_batches); + (*batch_idx, batch_infos, response) + }) + .collect(); + eprintln!(); // newline after progress + + // Process results sequentially + let mut stats = AuditStats { + kept: 0, deleted: 0, retargeted: 0, weakened: 0, strengthened: 0, errors: 0, + }; + let mut deletions: Vec = Vec::new(); + let mut retargets: Vec<(usize, String)> = Vec::new(); + let mut strength_changes: Vec<(usize, f32)> = Vec::new(); + + for (batch_idx, batch_infos, response) in &batch_results { + let response = match response { + Ok(r) => r, + Err(e) => { + eprintln!(" Batch {}: error: {}", batch_idx + 1, e); + stats.errors += batch_infos.len(); + continue; + } + }; + + let actions = parse_audit_response(response, batch_infos.len()); + + let mut responded: HashSet = HashSet::new(); + + for (idx, action) in &actions { + responded.insert(*idx); + let link = &batch_infos[*idx]; + + match action { + AuditAction::Keep => { + stats.kept += 1; + } + AuditAction::Delete => { + println!(" DELETE {} → {}", link.source_key, link.target_key); + deletions.push(link.rel_idx); + stats.deleted += 1; + } + AuditAction::Retarget(new_target) => { + println!(" RETARGET {} → {} (was {})", + link.source_key, new_target, link.target_key); + retargets.push((link.rel_idx, new_target.clone())); + stats.retargeted += 1; + } + AuditAction::Weaken(s) => { + println!(" WEAKEN {} → {} (str {:.2} → {:.2})", + link.source_key, link.target_key, link.strength, s); + strength_changes.push((link.rel_idx, *s)); + stats.weakened += 1; + } + AuditAction::Strengthen(s) => { + println!(" STRENGTHEN {} → {} (str {:.2} → {:.2})", + link.source_key, link.target_key, link.strength, s); + strength_changes.push((link.rel_idx, *s)); + stats.strengthened += 1; + } + } + } + + for i in 0..batch_infos.len() { + if !responded.contains(&i) { + stats.kept += 1; + } + } + + println!(" Batch {}/{}: +{}kept +{}del +{}retarget +{}weak +{}strong", + batch_idx + 1, total_batches, + stats.kept, stats.deleted, stats.retargeted, stats.weakened, stats.strengthened); + } + + // Apply changes + if apply && (stats.deleted > 0 || stats.retargeted > 0 + || stats.weakened > 0 || stats.strengthened > 0) { + println!("\nApplying changes..."); + + // Deletions: soft-delete + for rel_idx in &deletions { + store.relations[*rel_idx].deleted = true; + } + + // Strength changes + for (rel_idx, new_strength) in &strength_changes { + store.relations[*rel_idx].strength = *new_strength; + } + + // Retargets: soft-delete old, create new + for (rel_idx, new_target) in &retargets { + let source_key = store.relations[*rel_idx].source_key.clone(); + let old_strength = store.relations[*rel_idx].strength; + let source_uuid = store.nodes.get(&source_key) + .map(|n| n.uuid).unwrap_or([0u8; 16]); + let target_uuid = store.nodes.get(new_target) + .map(|n| n.uuid).unwrap_or([0u8; 16]); + + // Soft-delete old + store.relations[*rel_idx].deleted = true; + + // Create new + if target_uuid != [0u8; 16] { + let new_rel = new_relation( + source_uuid, target_uuid, + store::RelationType::Auto, + old_strength, + &source_key, new_target, + ); + store.add_relation(new_rel).ok(); + } + } + + store.save()?; + println!("Saved."); + } + + Ok(stats) +} diff --git a/src/subconscious/consolidate.rs b/src/subconscious/consolidate.rs new file mode 100644 index 0000000..9f125c8 --- /dev/null +++ b/src/subconscious/consolidate.rs @@ -0,0 +1,164 @@ +// Consolidation pipeline: plan → agents → maintenance → digests → links +// +// consolidate_full() runs the full autonomous consolidation: +// 1. Plan: analyze metrics, allocate agents +// 2. Execute: run each agent (agents apply changes via tool calls) +// 3. Graph maintenance (orphans, degree cap) +// 4. Digest: generate missing daily/weekly/monthly digests +// 5. Links: apply links extracted from digests +// 6. Summary: final metrics comparison + +use super::digest; +use crate::agent::oneshot; +use crate::neuro; +use crate::store::{self, Store}; + + +/// Append a line to the log buffer. +fn log_line(buf: &mut String, line: &str) { + buf.push_str(line); + buf.push('\n'); +} + +/// Run the full autonomous consolidation pipeline with logging. +pub fn consolidate_full(store: &mut Store) -> Result<(), String> { + consolidate_full_with_progress(store, &|_| {}) +} + +fn consolidate_full_with_progress( + store: &mut Store, + on_progress: &dyn Fn(&str), +) -> Result<(), String> { + let start = std::time::Instant::now(); + let log_key = format!("_consolidate-log-{}", store::compact_timestamp()); + let mut log_buf = String::new(); + + log_line(&mut log_buf, "=== CONSOLIDATE FULL ==="); + log_line(&mut log_buf, &format!("Started: {}", store::format_datetime(store::now_epoch()))); + log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); + log_line(&mut log_buf, ""); + + // --- Step 1: Plan --- + log_line(&mut log_buf, "--- Step 1: Plan ---"); + on_progress("planning"); + let plan = neuro::consolidation_plan(store); + let plan_text = neuro::format_plan(&plan); + log_line(&mut log_buf, &plan_text); + println!("{}", plan_text); + + let total_agents = plan.total(); + log_line(&mut log_buf, &format!("Total agents to run: {}", total_agents)); + + // --- Step 2: Execute agents --- + log_line(&mut log_buf, "\n--- Step 2: Execute agents ---"); + let mut agent_num = 0usize; + let mut agent_errors = 0usize; + + let batch_size = 5; + let runs = plan.to_agent_runs(batch_size); + + for (agent_type, count) in &runs { + agent_num += 1; + let label = if *count > 0 { + format!("[{}/{}] {} (batch={})", agent_num, runs.len(), agent_type, count) + } else { + format!("[{}/{}] {}", agent_num, runs.len(), agent_type) + }; + + log_line(&mut log_buf, &format!("\n{}", label)); + on_progress(&label); + println!("{}", label); + + // Reload store to pick up changes from previous agents + if agent_num > 1 { + *store = Store::load()?; + } + + match oneshot::run_one_agent(store, agent_type, *count, None) { + Ok(_) => { + let msg = " Done".to_string(); + log_line(&mut log_buf, &msg); + on_progress(&msg); + println!("{}", msg); + } + Err(e) => { + let msg = format!(" ERROR: {}", e); + log_line(&mut log_buf, &msg); + eprintln!("{}", msg); + agent_errors += 1; + } + } + } + + log_line(&mut log_buf, &format!("\nAgents complete: {} run, {} errors", + agent_num - agent_errors, agent_errors)); + store.save()?; + + // --- Step 3: Cap degree --- + log_line(&mut log_buf, "\n--- Step 3: Cap degree ---"); + on_progress("capping degree"); + println!("\n--- Capping node degree ---"); + *store = Store::load()?; + + match store.cap_degree(50) { + Ok((hubs, pruned)) => { + store.save()?; + log_line(&mut log_buf, &format!(" {} hubs capped, {} edges pruned", hubs, pruned)); + } + Err(e) => log_line(&mut log_buf, &format!(" ERROR: {}", e)), + } + + // --- Step 4: Digest auto --- + log_line(&mut log_buf, "\n--- Step 4: Digest auto ---"); + on_progress("generating digests"); + println!("\n--- Generating missing digests ---"); + *store = Store::load()?; + + match digest::digest_auto(store) { + Ok(()) => log_line(&mut log_buf, " Digests done."), + Err(e) => { + let msg = format!(" ERROR in digest auto: {}", e); + log_line(&mut log_buf, &msg); + eprintln!("{}", msg); + } + } + + // --- Step 5: Apply digest links --- + log_line(&mut log_buf, "\n--- Step 5: Apply digest links ---"); + on_progress("applying digest links"); + println!("\n--- Applying digest links ---"); + *store = Store::load()?; + + let links = digest::parse_all_digest_links(store); + let (applied, skipped, fallbacks) = digest::apply_digest_links(store, &links); + store.save()?; + log_line(&mut log_buf, &format!(" {} links applied, {} skipped, {} fallbacks", + applied, skipped, fallbacks)); + + // --- Step 6: Summary --- + let elapsed = start.elapsed(); + log_line(&mut log_buf, "\n--- Summary ---"); + log_line(&mut log_buf, &format!("Finished: {}", store::format_datetime(store::now_epoch()))); + log_line(&mut log_buf, &format!("Duration: {:.0}s", elapsed.as_secs_f64())); + *store = Store::load()?; + log_line(&mut log_buf, &format!("Nodes: {} Relations: {}", store.nodes.len(), store.relations.len())); + + let summary = format!( + "\n=== CONSOLIDATE FULL COMPLETE ===\n\ + Duration: {:.0}s\n\ + Agents: {} run, {} errors\n\ + Nodes: {} Relations: {}\n", + elapsed.as_secs_f64(), + agent_num - agent_errors, agent_errors, + store.nodes.len(), store.relations.len(), + ); + log_line(&mut log_buf, &summary); + println!("{}", summary); + + // Store the log as a node + store.upsert_provenance(&log_key, &log_buf, + "consolidate:write").ok(); + store.save()?; + + Ok(()) +} diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index a7150d5..413b02f 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -1,8 +1,386 @@ -// Digest link parsing: extracts ## Links sections from digest nodes -// and applies them to the memory graph. +use std::sync::Arc; +// Episodic digest generation: daily, weekly, monthly, auto +// +// Three digest levels form a temporal hierarchy: daily digests summarize +// journal entries, weekly digests summarize dailies, monthly digests +// summarize weeklies. All three share the same generate/auto-detect +// pipeline, parameterized by DigestLevel. use crate::store::{self, Store, new_relation}; + +use chrono::{Datelike, Duration, Local, NaiveDate}; use regex::Regex; +use std::collections::BTreeSet; + +/// Get all store keys for prompt context. +fn semantic_keys(store: &Store) -> Vec { + let mut keys: Vec = store.nodes.keys().cloned().collect(); + keys.sort(); + keys.truncate(200); + keys +} + +// --- Digest level descriptors --- + +#[allow(clippy::type_complexity)] +struct DigestLevel { + name: &'static str, + title: &'static str, + period: &'static str, + input_title: &'static str, + child_name: Option<&'static str>, // None = journal (leaf), Some = child digest files + /// Expand an arg into (canonical_label, dates covered). + label_dates: fn(&str) -> Result<(String, Vec), String>, + /// Map a YYYY-MM-DD date to this level's label. + date_to_label: fn(&str) -> Option, +} + +const DAILY: DigestLevel = DigestLevel { + name: "daily", + title: "Daily", + period: "Date", + input_title: "Journal entries", + child_name: None, + label_dates: |date| Ok((date.to_string(), vec![date.to_string()])), + date_to_label: |date| Some(date.to_string()), +}; + +/// Week label and 7 dates (Mon-Sun) for the week containing `date`. +fn week_dates(date: &str) -> Result<(String, Vec), String> { + let nd = NaiveDate::parse_from_str(date, "%Y-%m-%d") + .map_err(|e| format!("bad date '{}': {}", date, e))?; + let iso = nd.iso_week(); + let week_label = format!("{}-W{:02}", iso.year(), iso.week()); + let monday = nd - Duration::days(nd.weekday().num_days_from_monday() as i64); + let dates = (0..7) + .map(|i| (monday + Duration::days(i)).format("%Y-%m-%d").to_string()) + .collect(); + Ok((week_label, dates)) +} + +const WEEKLY: DigestLevel = DigestLevel { + name: "weekly", + title: "Weekly", + period: "Week", + input_title: "Daily digests", + child_name: Some("daily"), + label_dates: |arg| { + if !arg.contains('W') { + return week_dates(arg); + } + let (y, w) = arg.split_once("-W") + .ok_or_else(|| format!("bad week label: {}", arg))?; + let year: i32 = y.parse().map_err(|_| format!("bad week year: {}", arg))?; + let week: u32 = w.parse().map_err(|_| format!("bad week number: {}", arg))?; + let monday = NaiveDate::from_isoywd_opt(year, week, chrono::Weekday::Mon) + .ok_or_else(|| format!("invalid week: {}", arg))?; + let dates = (0..7) + .map(|i| (monday + Duration::days(i)).format("%Y-%m-%d").to_string()) + .collect(); + Ok((arg.to_string(), dates)) + }, + date_to_label: |date| week_dates(date).ok().map(|(l, _)| l), +}; + +const MONTHLY: DigestLevel = DigestLevel { + name: "monthly", + title: "Monthly", + period: "Month", + input_title: "Weekly digests", + child_name: Some("weekly"), + label_dates: |arg| { + let (year, month) = if arg.len() <= 7 { + let d = NaiveDate::parse_from_str(&format!("{}-01", arg), "%Y-%m-%d") + .map_err(|e| format!("bad month '{}': {}", arg, e))?; + (d.year(), d.month()) + } else { + let d = NaiveDate::parse_from_str(arg, "%Y-%m-%d") + .map_err(|e| format!("bad date '{}': {}", arg, e))?; + (d.year(), d.month()) + }; + let label = format!("{}-{:02}", year, month); + let mut dates = Vec::new(); + let mut day = 1u32; + while let Some(date) = NaiveDate::from_ymd_opt(year, month, day) { + if date.month() != month { break; } + dates.push(date.format("%Y-%m-%d").to_string()); + day += 1; + } + Ok((label, dates)) + }, + date_to_label: |date| NaiveDate::parse_from_str(date, "%Y-%m-%d") + .ok().map(|d| format!("{}-{:02}", d.year(), d.month())), +}; + +const LEVELS: &[&DigestLevel] = &[&DAILY, &WEEKLY, &MONTHLY]; + +/// Store key for a digest node: "daily-2026-03-04", "weekly-2026-W09", etc. +fn digest_node_key(level_name: &str, label: &str) -> String { + format!("{}-{}", level_name, label) +} + +// --- Input gathering --- + +/// Result of gathering inputs for a digest. +struct GatherResult { + label: String, + /// (display_label, content) pairs for the prompt. + inputs: Vec<(String, String)>, + /// Store keys of source nodes — used to create structural links. + source_keys: Vec, +} + +/// Load child digest content from the store. +fn load_child_digests(store: &Store, prefix: &str, labels: &[String]) -> (Vec<(String, String)>, Vec) { + let mut digests = Vec::new(); + let mut keys = Vec::new(); + for label in labels { + let key = digest_node_key(prefix, label); + if let Some(node) = store.nodes.get(&key) { + digests.push((label.clone(), node.content.clone())); + keys.push(key); + } + } + (digests, keys) +} + +/// Unified: gather inputs for any digest level. +fn gather(level: &DigestLevel, store: &Store, arg: &str) -> Result { + let (label, dates) = (level.label_dates)(arg)?; + + let (inputs, source_keys) = if let Some(child_name) = level.child_name { + // Map parent's dates through child's date_to_label → child labels + let child = LEVELS.iter() + .find(|l| l.name == child_name) + .expect("invalid child_name"); + let child_labels: Vec = dates.iter() + .filter_map(|d| (child.date_to_label)(d)) + .collect::>() + .into_iter() + .collect(); + load_child_digests(store, child_name, &child_labels) + } else { + // Leaf level: scan store for episodic entries matching date + let mut entries: Vec<_> = store.nodes.iter() + .filter(|(_, n)| n.node_type == store::NodeType::EpisodicSession + && n.created_at > 0 + && store::format_date(n.created_at) == label) + .map(|(key, n)| { + (store::format_datetime(n.timestamp), n.content.clone(), key.clone()) + }) + .collect(); + entries.sort_by(|a, b| a.0.cmp(&b.0)); + let keys = entries.iter().map(|(_, _, k)| k.clone()).collect(); + let inputs = entries.into_iter().map(|(dt, c, _)| (dt, c)).collect(); + (inputs, keys) + }; + + Ok(GatherResult { label, inputs, source_keys }) +} + +/// Unified: find candidate labels for auto-generation (past, not yet generated). +fn find_candidates(level: &DigestLevel, dates: &[String], today: &str) -> Vec { + let today_label = (level.date_to_label)(today); + dates.iter() + .filter_map(|d| (level.date_to_label)(d)) + .collect::>() + .into_iter() + .filter(|l| Some(l) != today_label.as_ref()) + .collect() +} + +// --- Unified generator --- + +fn format_inputs(inputs: &[(String, String)], daily: bool) -> String { + let mut text = String::new(); + for (label, content) in inputs { + if daily { + text.push_str(&format!("\n### {}\n\n{}\n", label, content)); + } else { + text.push_str(&format!("\n---\n## {}\n{}\n", label, content)); + } + } + text +} + +fn generate_digest( + store: &mut Store, + level: &DigestLevel, + label: &str, + inputs: &[(String, String)], + source_keys: &[String], +) -> Result<(), String> { + println!("Generating {} digest for {}...", level.name, label); + + if inputs.is_empty() { + println!(" No inputs found for {}", label); + return Ok(()); + } + println!(" {} inputs", inputs.len()); + + let keys = semantic_keys(store); + let keys_text = keys.iter() + .map(|k| format!(" - {}", k)) + .collect::>() + .join("\n"); + + let content = format_inputs(inputs, level.child_name.is_none()); + let covered = inputs.iter() + .map(|(l, _)| l.as_str()) + .collect::>() + .join(", "); + + // Load agent def — drives template, temperature, priority, tools + let def = super::defs::get_def("digest") + .ok_or("no digest agent definition")?; + let template = def.steps.first() + .map(|s| s.prompt.clone()) + .ok_or("digest agent has no prompt")?; + + // Substitute digest-specific and config placeholders, then resolve + // standard {{node:...}} etc. via the placeholder system + let cfg = crate::config::get(); + let partial = template + .replace("{agent_name}", &def.agent) + .replace("{user_name}", &cfg.user_name) + .replace("{assistant_name}", &cfg.assistant_name) + .replace("{{LEVEL}}", level.title) + .replace("{{PERIOD}}", level.period) + .replace("{{INPUT_TITLE}}", level.input_title) + .replace("{{LABEL}}", label) + .replace("{{CONTENT}}", &content) + .replace("{{COVERED}}", &covered) + .replace("{{KEYS}}", &keys_text); + + let graph = store.build_graph(); + let (prompt, _) = super::defs::resolve_placeholders( + &partial, store, &graph, &[], 0, + ); + println!(" Prompt: {} chars (~{} tokens)", prompt.len(), prompt.len() / 4); + + // Log to file like other agents + let log_dir = dirs::home_dir().unwrap_or_default() + .join(".consciousness/logs/llm/digest"); + std::fs::create_dir_all(&log_dir).ok(); + let log_path = log_dir.join(format!("{}.txt", crate::store::compact_timestamp())); + let _log = move |msg: &str| { + use std::io::Write; + if let Ok(mut f) = std::fs::OpenOptions::new() + .create(true).append(true).open(&log_path) + { + let _ = writeln!(f, "{}", msg); + } + }; + + println!(" Calling LLM..."); + let prompts = vec![prompt]; + let phases: Vec = def.steps.iter().map(|s| s.phase.clone()).collect(); + // Filter tools based on agent def + let all_tools = crate::agent::tools::memory_and_journal_tools(); + let tools: Vec<_> = if def.tools.is_empty() { + all_tools.to_vec() + } else { + all_tools.into_iter() + .filter(|t| def.tools.iter().any(|w| w == &t.name)) + .collect() + }; + let digest = crate::agent::oneshot::call_api_with_tools_sync( + &def.agent, &prompts, &phases, def.temperature, def.priority, + &tools, None)?; + + let key = digest_node_key(level.name, label); + store.upsert_provenance(&key, &digest, "digest:write")?; + + // Structural links: connect all source entries to this digest + let mut linked = 0; + for source_key in source_keys { + // Skip if link already exists + let exists = store.relations.iter().any(|r| + !r.deleted && r.source_key == *source_key && r.target_key == key); + if exists { continue; } + + let source_uuid = store.nodes.get(source_key) + .map(|n| n.uuid).unwrap_or([0u8; 16]); + let target_uuid = store.nodes.get(&key) + .map(|n| n.uuid).unwrap_or([0u8; 16]); + let mut rel = new_relation( + source_uuid, target_uuid, + store::RelationType::Link, 0.8, + source_key, &key, + ); + rel.provenance = "digest:structural".to_string(); + store.add_relation(rel)?; + linked += 1; + } + if linked > 0 { + println!(" Linked {} source entries → {}", linked, key); + } + + store.save()?; + println!(" Stored: {}", key); + + println!(" Done: {} lines", digest.lines().count()); + Ok(()) +} + +// --- Public API --- + +pub fn generate(store: &mut Store, level_name: &str, arg: &str) -> Result<(), String> { + let level = LEVELS.iter() + .find(|l| l.name == level_name) + .ok_or_else(|| format!("unknown digest level: {}", level_name))?; + let result = gather(level, store, arg)?; + generate_digest(store, level, &result.label, &result.inputs, &result.source_keys) +} + +// --- Auto-detect and generate missing digests --- + +pub fn digest_auto(store: &mut Store) -> Result<(), String> { + let today = Local::now().format("%Y-%m-%d").to_string(); + + // Collect all dates with episodic entries + let dates: Vec = store.nodes.values() + .filter(|n| n.node_type == store::NodeType::EpisodicSession && n.created_at > 0) + .map(|n| store::format_date(n.created_at)) + .collect::>() + .into_iter() + .collect(); + + let mut total = 0u32; + + for level in LEVELS { + let candidates = find_candidates(level, &dates, &today); + let mut generated = 0u32; + let mut skipped = 0u32; + + for arg in &candidates { + let result = gather(level, store, arg)?; + let key = digest_node_key(level.name, &result.label); + if store.nodes.contains_key(&key) { + skipped += 1; + continue; + } + if result.inputs.is_empty() { continue; } + println!("[auto] Missing {} digest for {}", level.name, result.label); + generate_digest(store, level, &result.label, &result.inputs, &result.source_keys)?; + generated += 1; + } + + println!("[auto] {}: {} generated, {} existed", level.name, generated, skipped); + total += generated; + } + + if total == 0 { + println!("[auto] All digests up to date."); + } else { + println!("[auto] Generated {} total digests.", total); + } + Ok(()) +} + +// --- Digest link parsing --- +// Replaces digest-link-parser.py: parses ## Links sections from digest +// files and applies them to the memory graph. /// A parsed link from a digest's Links section. pub struct DigestLink { @@ -201,3 +579,110 @@ pub fn apply_digest_links(store: &mut Store, links: &[DigestLink]) -> (usize, us (applied, skipped, fallbacks) } + +// --- Tool interface for digest generation (added 2026-04-04) --- + +/// Helper: extract string argument from tool call +fn get_str_required(args: &serde_json::Value, name: &str) -> Result { + args.get(name) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| format!("{} is required", name)) +} + +/// Wrap a Result for use in anyhow handlers. +fn str_err(r: Result) -> anyhow::Result { + r.map_err(|e| anyhow::anyhow!("{}", e)) +} + +/// digest_daily tool handler: generate a daily digest +async fn handle_digest_daily( + _agent: Option>, + args: serde_json::Value, +) -> anyhow::Result { + let date = str_err(get_str_required(&args, "date"))?; + let mut store = str_err(Store::load())?; + str_err(generate(&mut store, "daily", &date))?; + Ok(format!("Daily digest generated for {}", date)) +} + +/// digest_weekly tool handler: generate a weekly digest +async fn handle_digest_weekly( + _agent: Option>, + args: serde_json::Value, +) -> anyhow::Result { + let week_label = str_err(get_str_required(&args, "week"))?; + let mut store = str_err(Store::load())?; + str_err(generate(&mut store, "weekly", &week_label))?; + Ok(format!("Weekly digest generated for {}", week_label)) +} + +/// digest_monthly tool handler: generate a monthly digest +async fn handle_digest_monthly( + _agent: Option>, + args: serde_json::Value, +) -> anyhow::Result { + let month = str_err(get_str_required(&args, "month"))?; + let mut store = str_err(Store::load())?; + str_err(generate(&mut store, "monthly", &month))?; + Ok(format!("Monthly digest generated for {}", month)) +} + +/// digest_auto tool handler: auto-generate all missing digests +async fn handle_digest_auto( + _agent: Option>, + _args: serde_json::Value, +) -> anyhow::Result { + let mut store = str_err(Store::load())?; + str_err(digest_auto(&mut store))?; + Ok("Auto-generated all missing digests".to_string()) +} + +/// digest_links tool handler: parse and apply digest links +async fn handle_digest_links( + _agent: Option>, + _args: serde_json::Value, +) -> anyhow::Result { + let mut store = str_err(Store::load())?; + let links = parse_all_digest_links(&store); + let (applied, skipped, fallbacks) = apply_digest_links(&mut store, &links); + str_err(store.save())?; + Ok(format!("Applied {} digest links ({} skipped, {} fallback)", applied, skipped, fallbacks)) +} + +/// Return digest tools array for the tool registry +pub fn digest_tools() -> [super::super::agent::tools::Tool; 5] { + use super::super::agent::tools::Tool; + [ + Tool { + name: "digest_daily", + description: "Generate a daily digest from journal entries.", + parameters_json: r#"{"type":"object","properties":{"date":{"type":"string","description":"Date in YYYY-MM-DD format"}}, "required":["date"]}"#, + handler: Arc::new(|_a, v| Box::pin(async move { handle_digest_daily(_a, v).await })), + }, + Tool { + name: "digest_weekly", + description: "Generate a weekly digest from daily digests.", + parameters_json: r#"{"type":"object","properties":{"week":{"type":"string","description":"Week label (YYYY-W##) or date (YYYY-MM-DD)"}}, "required":["week"]}"#, + handler: Arc::new(|_a, v| Box::pin(async move { handle_digest_weekly(_a, v).await })), + }, + Tool { + name: "digest_monthly", + description: "Generate a monthly digest from weekly digests.", + parameters_json: r#"{"type":"object","properties":{"month":{"type":"string","description":"Month label (YYYY-MM) or date (YYYY-MM-DD)"}}, "required":["month"]}"#, + handler: Arc::new(|_a, v| Box::pin(async move { handle_digest_monthly(_a, v).await })), + }, + Tool { + name: "digest_auto", + description: "Auto-generate all missing digests (daily, weekly, monthly) for past dates that have content but no digest yet.", + parameters_json: r#"{"type":"object","properties":{}}"#, + handler: Arc::new(|_a, v| Box::pin(async move { handle_digest_auto(_a, v).await })), + }, + Tool { + name: "digest_links", + description: "Parse and apply structural links from digest nodes to the memory graph.", + parameters_json: r#"{"type":"object","properties":{}}"#, + handler: Arc::new(|_a, v| Box::pin(async move { handle_digest_links(_a, v).await })), + }, + ] +} diff --git a/src/subconscious/mod.rs b/src/subconscious/mod.rs index 433f721..a48620d 100644 --- a/src/subconscious/mod.rs +++ b/src/subconscious/mod.rs @@ -1,5 +1,24 @@ // Agent layer: LLM-powered operations on the memory graph +// +// Everything here calls external models (Sonnet, Haiku) or orchestrates +// sequences of such calls. The core graph infrastructure (store, graph, +// spectral, search, similarity) lives at the crate root. +// +// llm — model invocation, response parsing +// prompts — prompt generation from store data +// defs — agent file loading and placeholder resolution +// audit — link quality review via Sonnet +// consolidate — full consolidation pipeline +// knowledge — agent execution, conversation fragment selection +// enrich — journal enrichment, experience mining +// digest — episodic digest generation (daily/weekly/monthly) +// daemon — background job scheduler +// transcript — shared JSONL transcript parsing +// +// The session hook (context injection, agent orchestration) moved to claude/hook. +pub mod audit; +pub mod consolidate; pub mod daemon; pub mod defs; pub mod digest; diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index c332ce6..41c0824 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -223,26 +223,10 @@ impl SubconsciousScreen { agent.context.try_lock().ok() .map(|ctx| { - let mut views = Vec::new(); - views.push(section_to_view("System", ctx.system())); - views.push(section_to_view("Identity", ctx.identity())); - views.push(section_to_view("Journal", ctx.journal())); - - // Conversation: skip to fork point for subconscious agents let conv = ctx.conversation(); - let conv_view = section_to_view("Conversation", conv); - let fork = fork_point.min(conv_view.children.len()); - let conv_children: Vec = conv_view.children - .into_iter().skip(fork).collect(); - views.push(SectionView { - name: format!("Conversation ({} entries)", conv_children.len()), - tokens: conv_children.iter().map(|c| c.tokens).sum(), - content: String::new(), - children: conv_children, - status: String::new(), - }); - - views + let view = section_to_view("Conversation", conv); + let fork = fork_point.min(view.children.len()); + view.children.into_iter().skip(fork).collect() }) .unwrap_or_default() }