use std::sync::Arc; // tools/memory.rs — Native memory graph operations // // If running in the daemon process (STORE_HANDLE set), accesses // the store directly. Otherwise forwards to the daemon via socket. use anyhow::{Context, Result}; use std::sync::OnceLock; use crate::hippocampus::memory::MemoryNode; use crate::store::Store; // ── Store handle ─────────────────────────────────────────────── /// Global store handle. Set by daemon at startup. /// If None, tools forward to daemon socket. static STORE_HANDLE: OnceLock>> = OnceLock::new(); // Thread-local store for rpc_local fallback path. thread_local! { static LOCAL_STORE: std::cell::RefCell>>> = const { std::cell::RefCell::new(None) }; } /// Set the global store handle. Call once at daemon startup. pub fn set_store(store: Arc>) { STORE_HANDLE.set(store).ok(); } /// Check if we're running in daemon mode (have direct store access). pub fn is_daemon() -> bool { STORE_HANDLE.get().is_some() || LOCAL_STORE.with(|s| s.borrow().is_some()) } // ── Helpers ──────────────────────────────────────────────────── fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> { args.get(name).and_then(|v| v.as_str()).context(format!("{} is required", name)) } fn get_f64(args: &serde_json::Value, name: &str) -> Result { args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name)) } async fn cached_store() -> Result>> { // Check thread-local first (rpc_local fallback path) if let Some(store) = LOCAL_STORE.with(|s| s.borrow().clone()) { return Ok(store); } // Use global handle if set (daemon mode) if let Some(store) = STORE_HANDLE.get() { return Ok(store.clone()); } // Fallback to loading (for backwards compat during transition) Store::cached().await.map_err(|e| anyhow::anyhow!("{}", e)) } /// Run a tool with a temporarily-opened store (for rpc_local fallback). pub fn run_with_local_store(tool_name: &str, args: serde_json::Value) -> Result { let store = Store::load().map_err(|e| anyhow::anyhow!("{}", e))?; let arc = Arc::new(crate::Mutex::new(store)); LOCAL_STORE.with(|s| *s.borrow_mut() = Some(arc)); let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { let name = tool_name.to_string(); tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(dispatch(&name, &None, args)) })); LOCAL_STORE.with(|s| *s.borrow_mut() = None); result.map_err(|_| anyhow::anyhow!("tool panicked"))? } /// Get provenance from agent, or from args._provenance, or "manual". async fn get_provenance(agent: &Option>, args: &serde_json::Value) -> String { // Check args first (set by RPC path) if let Some(p) = args.get("_provenance").and_then(|v| v.as_str()) { return p.to_string(); } match agent { Some(a) => a.state.lock().await.provenance.clone(), None => "manual".to_string(), } } /// Single entry point for all memory/journal tool calls. /// If not daemon, forwards to daemon with provenance attached. async fn dispatch( tool_name: &str, agent: &Option>, args: serde_json::Value, ) -> Result { if !is_daemon() { // Forward to daemon, attaching provenance let mut args = args; if let Some(a) = agent { let prov = a.state.lock().await.provenance.clone(); args.as_object_mut().map(|o| o.insert("_provenance".into(), prov.into())); } let name = tool_name.to_string(); return tokio::task::spawn_blocking(move || { crate::mcp_server::memory_rpc(&name, args) }).await.map_err(|e| anyhow::anyhow!("spawn_blocking: {}", e))?; } // Daemon path - dispatch to implementation match tool_name { "memory_render" => render(&args).await, "memory_write" => write(agent, &args).await, "memory_search" => search(&args).await, "memory_links" => links(&args).await, "memory_link_set" => link_set(&args).await, "memory_link_add" => link_add(agent, &args).await, "memory_delete" => delete(&args).await, "memory_history" => history(&args).await, "memory_weight_set" => weight_set(&args).await, "memory_rename" => rename(&args).await, "memory_supersede" => supersede(agent, &args).await, "memory_query" => query(&args).await, "graph_topology" => graph_topology().await, "graph_health" => graph_health().await, "graph_communities" => graph_communities(&args).await, "graph_normalize_strengths" => graph_normalize_strengths(&args).await, "journal_tail" => journal_tail(&args).await, "journal_new" => journal_new(agent, &args).await, "journal_update" => journal_update(agent, &args).await, _ => anyhow::bail!("unknown tool: {}", tool_name), } } // ── Definitions ──────────────────────────────────────────────── pub fn memory_tools() -> [super::Tool; 14] { use super::Tool; [ Tool { name: "memory_render", description: "Read a memory node's content and links.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_render", &a, v).await })) }, Tool { name: "memory_write", description: "Create or update a memory node.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"},"content":{"type":"string","description":"Full content (markdown)"}},"required":["key","content"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_write", &a, v).await })) }, Tool { name: "memory_search", description: "Search the memory graph via spreading activation. Give 2-4 seed node keys.", parameters_json: r#"{"type":"object","properties":{"keys":{"type":"array","items":{"type":"string"},"description":"Seed node keys to activate from"},"max_hops":{"type":"integer","description":"Max graph hops (default 3)"},"edge_decay":{"type":"number","description":"Decay per hop (default 0.3)"},"min_activation":{"type":"number","description":"Cutoff threshold (default 0.01)"},"limit":{"type":"integer","description":"Max results (default 20)"}},"required":["keys"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_search", &a, v).await })) }, Tool { name: "memory_links", description: "Show a node's neighbors with link strengths.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_links", &a, v).await })) }, Tool { name: "memory_link_set", description: "Set link strength between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"},"strength":{"type":"number","description":"0.01 to 1.0"}},"required":["source","target","strength"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_link_set", &a, v).await })) }, Tool { name: "memory_link_add", description: "Add a new link between two nodes.", parameters_json: r#"{"type":"object","properties":{"source":{"type":"string"},"target":{"type":"string"}},"required":["source","target"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_link_add", &a, v).await })) }, Tool { name: "memory_delete", description: "Delete a memory node.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"}},"required":["key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_delete", &a, v).await })) }, Tool { name: "memory_history", description: "Show version history for a node.", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string","description":"Node key"},"full":{"type":"boolean","description":"Show full content for each version"}},"required":["key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_history", &a, v).await })) }, Tool { name: "memory_weight_set", description: "Set a node's weight directly (0.01 to 1.0).", parameters_json: r#"{"type":"object","properties":{"key":{"type":"string"},"weight":{"type":"number","description":"0.01 to 1.0"}},"required":["key","weight"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_weight_set", &a, v).await })) }, Tool { name: "memory_rename", description: "Rename a node key in place.", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"}},"required":["old_key","new_key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_rename", &a, v).await })) }, Tool { name: "memory_supersede", description: "Mark a node as superseded by another (sets weight to 0.01).", parameters_json: r#"{"type":"object","properties":{"old_key":{"type":"string"},"new_key":{"type":"string"},"reason":{"type":"string"}},"required":["old_key","new_key"]}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_supersede", &a, v).await })) }, Tool { name: "memory_query", description: "Run a structured query against the memory graph.", parameters_json: r#"{ "type": "object", "properties": { "query": {"type": "string", "description": "Query expression"}, "format": {"type": "string", "description": "compact (default) or full (with content and graph metrics)", "default": "compact"} }, "required": ["query"] }"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("memory_query", &a, v).await })) }, Tool { name: "graph_topology", description: "Show graph topology stats (nodes, edges, clustering, hubs).", parameters_json: r#"{"type":"object","properties":{}}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("graph_topology", &a, v).await })) }, Tool { name: "graph_health", description: "Show graph health report with maintenance recommendations.", parameters_json: r#"{"type":"object","properties":{}}"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("graph_health", &a, v).await })) }, ] } pub fn journal_tools() -> [super::Tool; 3] { use super::Tool; [ Tool { name: "journal_tail", description: "Read the last N entries at a given level.", parameters_json: r#"{ "type": "object", "properties": { "count": {"type": "integer", "description": "Number of entries", "default": 1}, "level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly", "default": 0}, "format": {"type": "string", "description": "compact or full (with content)", "default": "full"}, "after": {"type": "string", "description": "Only entries after this date (YYYY-MM-DD)"} } }"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_tail", &a, v).await })) }, Tool { name: "journal_new", description: "Start a new journal/digest entry.", parameters_json: r#"{ "type": "object", "properties": { "name": {"type": "string", "description": "Short node name (becomes the key)"}, "title": {"type": "string", "description": "Descriptive title"}, "body": {"type": "string", "description": "Entry body"}, "level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly", "default": 0} }, "required": ["name", "title", "body"] }"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_new", &a, v).await })) }, Tool { name: "journal_update", description: "Append text to the most recent entry at a level.", parameters_json: r#"{ "type": "object", "properties": { "body": {"type": "string", "description": "Text to append"}, "level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly", "default": 0} }, "required": ["body"] }"#, handler: Arc::new(|a, v| Box::pin(async move { dispatch("journal_update", &a, v).await })) }, ] } // ── Memory tools ─────────────────────────────────────────────── async fn render(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let raw = args.get("raw").and_then(|v| v.as_bool()).unwrap_or(false); let arc = cached_store().await?; let store = arc.lock().await; let node = MemoryNode::from_store(&store, key) .ok_or_else(|| anyhow::anyhow!("node not found: {}", key))?; if raw { Ok(node.content) } else { Ok(node.render()) } } async fn write(agent: &Option>, args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let content = get_str(args, "content")?; let prov = get_provenance(agent, args).await; let arc = cached_store().await?; let mut store = arc.lock().await; let result = store.upsert_provenance(key, content, &prov) .map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("{} '{}'", result, key)) } async fn search(args: &serde_json::Value) -> Result { let keys: Vec = args.get("keys") .and_then(|v| v.as_array()) .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect()) .unwrap_or_default(); if keys.is_empty() { anyhow::bail!("memory_search requires at least one seed key"); } // Optional params with defaults let max_hops = args.get("max_hops").and_then(|v| v.as_u64()).unwrap_or(3) as u32; let edge_decay = args.get("edge_decay").and_then(|v| v.as_f64()).unwrap_or(0.3); let min_activation = args.get("min_activation").and_then(|v| v.as_f64()).unwrap_or(0.01); let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as usize; let arc = cached_store().await?; let store = arc.lock().await; let graph = crate::graph::build_graph_fast(&*store); let seeds: Vec<(String, f64)> = keys.iter() .filter_map(|k| { let resolved = store.resolve_key(k).ok()?; Some((resolved, 1.0)) }) .collect(); if seeds.is_empty() { anyhow::bail!("no valid seed keys found"); } let seed_set: std::collections::HashSet<&str> = seeds.iter() .map(|(k, _)| k.as_str()).collect(); let results = crate::search::spreading_activation( &seeds, &graph, &*store, max_hops, edge_decay, min_activation, ); Ok(results.iter() .filter(|(k, _)| !seed_set.contains(k.as_str())) .take(limit) .map(|(key, score)| format!(" {:.2} {}", score, key)) .collect::>().join("\n")) } async fn links(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let node = MemoryNode::load(key) .ok_or_else(|| anyhow::anyhow!("node not found: {}", key))?; let mut out = format!("Neighbors of '{}':\n", key); for (target, strength, is_new) in &node.links { let tag = if *is_new { " (new)" } else { "" }; out.push_str(&format!(" ({:.2}) {}{}\n", strength, target, tag)); } Ok(out) } async fn link_set(args: &serde_json::Value) -> Result { let arc = cached_store().await?; let mut store = arc.lock().await; let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?; let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?; let strength = get_f64(args, "strength")? as f32; let old = store.set_link_strength(&s, &t, strength).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("{} ↔ {} strength {:.2} → {:.2}", s, t, old, strength)) } async fn link_add(agent: &Option>, args: &serde_json::Value) -> Result { let arc = cached_store().await?; let mut store = arc.lock().await; let s = store.resolve_key(get_str(args, "source")?).map_err(|e| anyhow::anyhow!("{}", e))?; let t = store.resolve_key(get_str(args, "target")?).map_err(|e| anyhow::anyhow!("{}", e))?; let prov = get_provenance(agent, args).await; let strength = store.add_link(&s, &t, &prov).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("linked {} → {} (strength={:.2})", s, t, strength)) } async fn delete(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let arc = cached_store().await?; let mut store = arc.lock().await; let resolved = store.resolve_key(key).map_err(|e| anyhow::anyhow!("{}", e))?; store.delete_node(&resolved).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("deleted {}", resolved)) } async fn history(args: &serde_json::Value) -> Result { let key = get_str(args, "key")?; let full = args.get("full").and_then(|v| v.as_bool()).unwrap_or(false); let arc = cached_store().await?; let store = arc.lock().await; let key = store.resolve_key(key).unwrap_or_else(|_| key.to_string()); drop(store); let path = crate::store::nodes_path(); if !path.exists() { anyhow::bail!("No node log found"); } use std::io::BufReader; let file = std::fs::File::open(&path) .map_err(|e| anyhow::anyhow!("open {}: {}", path.display(), e))?; let mut reader = BufReader::new(file); let mut versions: Vec = Vec::new(); while let Ok(msg) = capnp::serialize::read_message(&mut reader, capnp::message::ReaderOptions::new()) { let log = msg.get_root::() .map_err(|e| anyhow::anyhow!("read log: {}", e))?; for node_reader in log.get_nodes() .map_err(|e| anyhow::anyhow!("get nodes: {}", e))? { let node = crate::store::Node::from_capnp_migrate(node_reader) .map_err(|e| anyhow::anyhow!("{}", e))?; if node.key == key { versions.push(node); } } } if versions.is_empty() { anyhow::bail!("No history found for '{}'", key); } let mut out = format!("{} versions of '{}':\n\n", versions.len(), key); for node in &versions { let ts = crate::store::format_datetime(node.timestamp); let deleted = if node.deleted { " DELETED" } else { "" }; if full { out.push_str(&format!("=== v{} {} {}{} w={:.3} {}b ===\n", node.version, ts, node.provenance, deleted, node.weight, node.content.len())); out.push_str(&node.content); out.push('\n'); } else { let preview = crate::util::first_n_chars(&node.content, 120).replace('\n', "\\n"); out.push_str(&format!("v{:<3} {} {:24} w={:.3} {}b{}\n {}\n", node.version, ts, node.provenance, node.weight, node.content.len(), deleted, preview)); } } Ok(out) } async fn weight_set(args: &serde_json::Value) -> Result { let arc = cached_store().await?; let mut store = arc.lock().await; let key = store.resolve_key(get_str(args, "key")?).map_err(|e| anyhow::anyhow!("{}", e))?; let weight = get_f64(args, "weight")? as f32; let (old, new) = store.set_weight(&key, weight).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("weight {} {:.2} → {:.2}", key, old, new)) } async fn rename(args: &serde_json::Value) -> Result { let old_key = get_str(args, "old_key")?; let new_key = get_str(args, "new_key")?; let arc = cached_store().await?; let mut store = arc.lock().await; let resolved = store.resolve_key(old_key).map_err(|e| anyhow::anyhow!("{}", e))?; store.rename_node(&resolved, new_key).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("Renamed '{}' → '{}'", resolved, new_key)) } async fn supersede(agent: &Option>, args: &serde_json::Value) -> Result { let old_key = get_str(args, "old_key")?; let new_key = get_str(args, "new_key")?; let reason = args.get("reason").and_then(|v| v.as_str()).unwrap_or("superseded"); let arc = cached_store().await?; let mut store = arc.lock().await; let content = store.nodes.get(old_key) .map(|n| n.content.clone()) .ok_or_else(|| anyhow::anyhow!("node not found: {}", old_key))?; let notice = format!("**SUPERSEDED** by `{}` — {}\n\n---\n\n{}", new_key, reason, content.trim()); let prov = get_provenance(agent, args).await; store.upsert_provenance(old_key, ¬ice, &prov) .map_err(|e| anyhow::anyhow!("{}", e))?; store.set_weight(old_key, 0.01).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; Ok(format!("superseded {} → {} ({})", old_key, new_key, reason)) } async fn query(args: &serde_json::Value) -> Result { let query_str = get_str(args, "query")?; let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("compact"); let arc = cached_store().await?; let store = arc.lock().await; let graph = store.build_graph(); match format { "full" => { // Rich output with full content, graph metrics, hub analysis let results = crate::query_parser::execute_query(&store, &graph, query_str) .map_err(|e| anyhow::anyhow!("{}", e))?; let keys: Vec = results.into_iter().map(|r| r.key).collect(); let items = crate::subconscious::defs::keys_to_replay_items(&store, &keys, &graph); Ok(crate::subconscious::prompts::format_nodes_section(&store, &items, &graph)) } _ => { // Compact output: handles count, select, and all expression types crate::query_parser::query_to_string(&store, &graph, query_str) .map_err(|e| anyhow::anyhow!("{}", e)) } } } // ── Journal tools ────────────────────────────────────────────── async fn journal_tail(args: &serde_json::Value) -> Result { let count = args.get("count").and_then(|v| v.as_u64()).unwrap_or(1); let level = args.get("level").and_then(|v| v.as_u64()).unwrap_or(0); let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("full"); let after = args.get("after").and_then(|v| v.as_str()); let type_name = match level { 0 => "episodic", 1 => "daily", 2 => "weekly", 3 => "monthly", _ => return Err(anyhow::anyhow!("invalid level: {} (0=journal, 1=daily, 2=weekly, 3=monthly)", level)), }; let mut q = format!("all | type:{} | sort:timestamp", type_name); if let Some(date) = after { // Convert date to age in seconds if let Ok(nd) = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") { let ts = nd.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); let age = chrono::Utc::now().timestamp() - ts; q.push_str(&format!(" | age:<{}", age)); } } q.push_str(&format!(" | limit:{}", count)); query(&serde_json::json!({"query": q, "format": format})).await } fn level_to_node_type(level: i64) -> crate::store::NodeType { match level { 1 => crate::store::NodeType::EpisodicDaily, 2 => crate::store::NodeType::EpisodicWeekly, 3 => crate::store::NodeType::EpisodicMonthly, _ => crate::store::NodeType::EpisodicSession, } } async fn journal_new(agent: &Option>, args: &serde_json::Value) -> Result { let name = get_str(args, "name")?; let title = get_str(args, "title")?; let body = get_str(args, "body")?; let level = args.get("level").and_then(|v| v.as_i64()).unwrap_or(0); let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M"); let content = format!("## {} — {}\n\n{}", ts, title, body); let base_key: String = name.split_whitespace() .map(|w| w.to_lowercase() .chars().filter(|c| c.is_alphanumeric() || *c == '-') .collect::()) .filter(|s| !s.is_empty()) .collect::>() .join("-"); let base_key = if base_key.len() > 80 { &base_key[..80] } else { base_key.as_str() }; let arc = cached_store().await?; let mut store = arc.lock().await; let key = if store.nodes.contains_key(base_key) { let mut n = 2; loop { let candidate = format!("{}-{}", base_key, n); if !store.nodes.contains_key(&candidate) { break candidate; } n += 1; } } else { base_key.to_string() }; let mut node = crate::store::new_node(&key, &content); node.node_type = level_to_node_type(level); node.provenance = get_provenance(agent, args).await; store.upsert_node(node).map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; let word_count = body.split_whitespace().count(); Ok(format!("New entry '{}' ({} words)", title, word_count)) } async fn journal_update(agent: &Option>, args: &serde_json::Value) -> Result { let body = get_str(args, "body")?; let level = args.get("level").and_then(|v| v.as_i64()).unwrap_or(0); let node_type = level_to_node_type(level); let arc = cached_store().await?; let mut store = arc.lock().await; let latest_key = store.nodes.values() .filter(|n| n.node_type == node_type) .max_by_key(|n| n.created_at) .map(|n| n.key.clone()); let Some(key) = latest_key else { anyhow::bail!("no entry at level {} to update — use journal_new first", level); }; let existing = store.nodes.get(&key).unwrap().content.clone(); let new_content = format!("{}\n\n{}", existing.trim_end(), body); let prov = get_provenance(agent, args).await; store.upsert_provenance(&key, &new_content, &prov) .map_err(|e| anyhow::anyhow!("{}", e))?; store.save().map_err(|e| anyhow::anyhow!("{}", e))?; let word_count = body.split_whitespace().count(); Ok(format!("Updated last entry (+{} words)", word_count)) } // ── Graph tools ─────────────────────────────────────────────── async fn graph_topology() -> Result { let arc = cached_store().await?; let store = arc.lock().await; let graph = store.build_graph(); Ok(crate::subconscious::prompts::format_topology_header(&graph)) } async fn graph_health() -> Result { let arc = cached_store().await?; let store = arc.lock().await; let graph = store.build_graph(); Ok(crate::subconscious::prompts::format_health_section(&store, &graph)) } async fn graph_communities(args: &serde_json::Value) -> Result { let top_n = args.get("top_n").and_then(|v| v.as_u64()).unwrap_or(10) as usize; let min_size = args.get("min_size").and_then(|v| v.as_u64()).unwrap_or(3) as usize; let arc = cached_store().await?; let store = arc.lock().await; let g = store.build_graph(); let infos = g.community_info(); let total = infos.len(); let shown: Vec<_> = infos.into_iter() .filter(|c| c.size >= min_size) .take(top_n) .collect(); use std::fmt::Write; let mut out = String::new(); writeln!(out, "{} communities total ({} with size >= {})\n", total, shown.len(), min_size).ok(); writeln!(out, "{:<6} {:>5} {:>7} {:>7} members", "id", "size", "iso", "cross").ok(); writeln!(out, "{}", "-".repeat(70)).ok(); for c in &shown { let preview: Vec<&str> = c.members.iter() .take(5) .map(|s| s.as_str()) .collect(); let more = if c.size > 5 { format!(" +{}", c.size - 5) } else { String::new() }; writeln!(out, "{:<6} {:>5} {:>6.0}% {:>7} {}{}", c.id, c.size, c.isolation * 100.0, c.cross_edges, preview.join(", "), more).ok(); } Ok(out) } async fn graph_normalize_strengths(args: &serde_json::Value) -> Result { let apply = args.get("apply").and_then(|v| v.as_bool()).unwrap_or(false); let arc = cached_store().await?; let mut store = arc.lock().await; let graph = store.build_graph(); let strengths = graph.jaccard_strengths(); // Build lookup from (source_key, target_key) → new_strength let mut updates: std::collections::HashMap<(String, String), f32> = std::collections::HashMap::new(); for (a, b, s) in &strengths { updates.insert((a.clone(), b.clone()), *s); updates.insert((b.clone(), a.clone()), *s); } let mut changed = 0usize; let mut unchanged = 0usize; let mut temporal_skipped = 0usize; let mut delta_sum: f64 = 0.0; let mut buckets = [0usize; 10]; for rel in &mut store.relations { if rel.deleted { continue; } if rel.strength == 1.0 && rel.rel_type == crate::store::RelationType::Auto { temporal_skipped += 1; continue; } if let Some(&new_s) = updates.get(&(rel.source_key.clone(), rel.target_key.clone())) { let old_s = rel.strength; let delta = (new_s - old_s).abs(); if delta > 0.001 { delta_sum += delta as f64; if apply { rel.strength = new_s; } changed += 1; } else { unchanged += 1; } let bucket = ((new_s * 10.0) as usize).min(9); buckets[bucket] += 1; } } use std::fmt::Write; let mut out = String::new(); writeln!(out, "Normalize link strengths (Jaccard similarity)").ok(); writeln!(out, " Total edges in graph: {}", strengths.len()).ok(); writeln!(out, " Would change: {}", changed).ok(); writeln!(out, " Unchanged: {}", unchanged).ok(); writeln!(out, " Temporal (skipped): {}", temporal_skipped).ok(); if changed > 0 { writeln!(out, " Avg delta: {:.3}", delta_sum / changed as f64).ok(); } writeln!(out).ok(); writeln!(out, " Strength distribution:").ok(); for (i, &count) in buckets.iter().enumerate() { let lo = i as f32 / 10.0; let hi = lo + 0.1; let bar = "#".repeat(count / 50 + if count > 0 { 1 } else { 0 }); writeln!(out, " {:.1}-{:.1}: {:5} {}", lo, hi, count, bar).ok(); } if apply { store.save().map_err(|e| anyhow::anyhow!("{}", e))?; writeln!(out, "\nApplied {} strength updates.", changed).ok(); } else { writeln!(out, "\nDry run. Pass apply:true to write changes.").ok(); } Ok(out) }