From 3640de444bf5d70cc8ea42e5473326a60652a4d1 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sat, 21 Mar 2026 19:42:03 -0400 Subject: [PATCH] cleanup: fix clippy warnings in daemon.rs - Remove dead code (job_split_one function never called) - Fix needless borrows (ctx.log_line(&format! -> format!)) - Fix slice clone ([key.clone()] -> std::slice::from_ref(&key)) - Collapse nested if statements - Fix unwrap after is_some check - Remove redundant closures in task spawning Reduces daemon.rs from 2030 to 1825 lines. --- poc-memory/src/agents/daemon.rs | 317 ++++++-------------------------- 1 file changed, 56 insertions(+), 261 deletions(-) diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 0ffc95f..81498a7 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -126,14 +126,13 @@ fn job_targeted_agent( let job_name = format!("c-{}-target({})", agent, key); run_job(ctx, &job_name, || { let mut store = crate::store::Store::load()?; - ctx.log_line(&format!("targeting: {}", key)); - let job = job_name.clone(); + ctx.log_line(format!("targeting: {}", key)); let log = |msg: &str| { ctx.log_line(msg); - log_event(&job, "progress", msg); + log_event(&job_name, "progress", msg); }; super::knowledge::run_one_agent_with_keys( - &mut store, &agent, &[key.clone()], 5, "daemon", &log, false, + &mut store, &agent, std::slice::from_ref(&key), 5, "daemon", &log, false, )?; ctx.log_line("done"); Ok(()) @@ -154,7 +153,6 @@ fn job_consolidation_agent( let agent = agent_type.to_string(); let batch = batch_size; let job_name = format!("c-{}", agent); - let job_name2 = job_name.clone(); let in_flight = Arc::clone(in_flight); run_job(ctx, &job_name, || { ctx.log_line("loading store"); @@ -166,7 +164,7 @@ fn job_consolidation_agent( let graph = store.build_graph(); { let mut locked = in_flight.lock().unwrap(); - ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)", + ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)", agent, batch, locked.len())); // Run the agent's query, filtering out in-flight nodes @@ -214,7 +212,7 @@ fn job_consolidation_agent( let log = |msg: &str| { ctx.log_line(msg); - log_event(&job_name2, "progress", msg); + log_event(&job_name, "progress", msg); }; // Use run_one_agent_with_keys — we already selected seeds above, // no need to re-run the query. @@ -246,7 +244,7 @@ fn job_rename_agent( let mut store = crate::store::Store::load()?; let batch = if batch_size == 0 { 10 } else { batch_size }; - ctx.log_line(&format!("running rename agent (batch={})", batch)); + ctx.log_line(format!("running rename agent (batch={})", batch)); let log = |msg: &str| ctx.log_line(msg); let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate", &log, false)?; @@ -268,25 +266,25 @@ fn job_rename_agent( let resolved = match store.resolve_key(old_key) { Ok(k) => k, Err(e) => { - ctx.log_line(&format!("skip: {} → {}: {}", old_key, new_key, e)); + ctx.log_line(format!("skip: {} → {}: {}", old_key, new_key, e)); skipped += 1; continue; } }; if store.nodes.contains_key(new_key) { - ctx.log_line(&format!("skip: {} already exists", new_key)); + ctx.log_line(format!("skip: {} already exists", new_key)); skipped += 1; continue; } match store.rename_node(&resolved, new_key) { Ok(()) => { - ctx.log_line(&format!("renamed: {} → {}", resolved, new_key)); + ctx.log_line(format!("renamed: {} → {}", resolved, new_key)); applied += 1; } Err(e) => { - ctx.log_line(&format!("error: {} → {}: {}", resolved, new_key, e)); + ctx.log_line(format!("error: {} → {}: {}", resolved, new_key, e)); skipped += 1; } } @@ -296,211 +294,7 @@ fn job_rename_agent( store.save()?; } - ctx.log_line(&format!("done: {} applied, {} skipped", applied, skipped)); - Ok(()) - }) -} - -/// Run the split agent: two-phase decomposition of large nodes. -/// -/// Phase 1: Send node + neighbors to LLM, get back a JSON split plan -/// (child keys, descriptions, section hints). -/// Phase 2: For each child, send parent content + child description to LLM, -/// get back the extracted/reorganized content for that child. -/// -/// This handles arbitrarily large nodes because the output of each phase 2 -/// call is proportional to one child, not the whole parent. -/// Split a single node by key. Called as an independent task so multiple -/// splits can run in parallel. Each task loads the store fresh, checks the -/// node still exists and hasn't been split, does the LLM work, then saves. -fn job_split_one( - ctx: &ExecutionContext, - parent_key: String, -) -> Result<(), TaskError> { - run_job(ctx, "c-split", || { - ctx.log_line(&format!("loading store for {}", parent_key)); - let mut store = crate::store::Store::load()?; - - // Check node still exists and hasn't been deleted/split already - let content_len = match store.nodes.get(parent_key.as_str()) { - Some(n) if !n.deleted => n.content.len(), - _ => { - ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key)); - return Ok(()); - } - }; - - ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len)); - - // Phase 1: get split plan - let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?; - ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len())); - - let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?; - let plan = match super::llm::parse_json_response(&plan_response) { - Ok(v) => v, - Err(e) => { - ctx.log_line(&format!("phase 1 parse error: {}", e)); - return Ok(()); - } - }; - - let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or(""); - if action == "keep" { - let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or(""); - ctx.log_line(&format!("keep: {} ({})", parent_key, reason)); - return Ok(()); - } - if action != "split" { - ctx.log_line(&format!("unexpected action: {}", action)); - return Ok(()); - } - - let children_plan = match plan.get("children").and_then(|v| v.as_array()) { - Some(c) if c.len() >= 2 => c, - _ => { - ctx.log_line("plan has fewer than 2 children, skipping"); - return Ok(()); - } - }; - - ctx.log_line(&format!("phase 1: {} children planned", children_plan.len())); - for child in children_plan { - let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?"); - let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or(""); - ctx.log_line(&format!(" planned: {} — {}", key, desc)); - } - - // Phase 2: extract content for each child - let mut children: Vec<(String, String)> = Vec::new(); - // Collect neighbor assignments from plan: child_key -> [neighbor_keys] - let mut neighbor_map: HashMap> = HashMap::new(); - - for child_plan in children_plan { - let child_key = match child_plan.get("key").and_then(|v| v.as_str()) { - Some(k) => k.to_string(), - None => continue, - }; - let child_desc = child_plan.get("description") - .and_then(|v| v.as_str()).unwrap_or(""); - let child_sections = child_plan.get("sections") - .and_then(|v| v.as_array()) - .map(|arr| arr.iter() - .filter_map(|v| v.as_str()) - .collect::>() - .join(", ")) - .unwrap_or_default(); - let child_neighbors: Vec = child_plan.get("neighbors") - .and_then(|v| v.as_array()) - .map(|arr| arr.iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect()) - .unwrap_or_default(); - neighbor_map.insert(child_key.clone(), child_neighbors); - - ctx.log_line(&format!("phase 2: extracting {}", child_key)); - - let extract_prompt = super::prompts::split_extract_prompt( - &store, &parent_key, &child_key, child_desc, &child_sections)?; - ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len())); - - let content = match super::llm::call_sonnet("split-extract", &extract_prompt) { - Ok(c) => c, - Err(e) => { - ctx.log_line(&format!(" extract error: {}", e)); - continue; - } - }; - - ctx.log_line(&format!(" extracted: {} chars", content.len())); - children.push((child_key, content)); - } - - if children.len() < 2 { - ctx.log_line(&format!("only {} children extracted, skipping", children.len())); - return Ok(()); - } - - // Reload store before mutations — another split may have saved meanwhile - store = crate::store::Store::load()?; - - // Re-check parent still exists after reload - if !store.nodes.contains_key(parent_key.as_str()) || - store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) { - ctx.log_line(&format!("skip: {} was split by another task", parent_key)); - return Ok(()); - } - - // Collect parent's edges before modifications - let parent_edges: Vec<_> = store.relations.iter() - .filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key)) - .cloned() - .collect(); - - // Create child nodes - let mut child_uuids: Vec<([u8; 16], String)> = Vec::new(); - for (child_key, content) in &children { - if store.nodes.contains_key(child_key.as_str()) { - ctx.log_line(&format!(" skip: {} already exists", child_key)); - continue; - } - store.upsert_provenance(child_key, content, - "consolidate:write")?; - let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid; - child_uuids.push((uuid, child_key.clone())); - ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len())); - } - - // Inherit edges using agent's neighbor assignments from the plan - for (child_uuid, child_key) in &child_uuids { - let neighbors = match neighbor_map.get(child_key) { - Some(n) => n, - None => continue, - }; - for neighbor_key in neighbors { - // Find the parent edge for this neighbor to inherit its strength - let parent_edge = parent_edges.iter().find(|r| { - r.source_key == *neighbor_key || r.target_key == *neighbor_key - }); - let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3); - - let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) { - Some(n) => n.uuid, - None => continue, - }; - - let rel = crate::store::new_relation( - *child_uuid, neighbor_uuid, - crate::store::RelationType::Auto, strength, - child_key, neighbor_key, - ); - store.add_relation(rel).ok(); - } - } - - // Link siblings - for i in 0..child_uuids.len() { - for j in (i+1)..child_uuids.len() { - let rel = crate::store::new_relation( - child_uuids[i].0, child_uuids[j].0, - crate::store::RelationType::Auto, 0.5, - &child_uuids[i].1, &child_uuids[j].1, - ); - store.add_relation(rel).ok(); - } - } - - // Tombstone parent - if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) { - parent.deleted = true; - parent.version += 1; - let tombstone = parent.clone(); - store.append_nodes(std::slice::from_ref(&tombstone)).ok(); - } - store.nodes.remove(parent_key.as_str()); - - ctx.log_line(&format!("split complete: {} → {} children", parent_key, child_uuids.len())); - store.save()?; + ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped)); Ok(()) }) } @@ -512,7 +306,7 @@ fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> { let mut store = crate::store::Store::load()?; ctx.log_line("linking orphans"); let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15); - ctx.log_line(&format!("{} orphans, {} links added", orphans, added)); + ctx.log_line(format!("{} orphans, {} links added", orphans, added)); Ok(()) }) } @@ -526,7 +320,7 @@ fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> { match store.cap_degree(50) { Ok((hubs, pruned)) => { store.save()?; - ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned)); + ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned)); Ok(()) } Err(e) => Err(e), @@ -543,7 +337,7 @@ fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> { let links = super::digest::parse_all_digest_links(&store); let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links); store.save()?; - ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); + ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); Ok(()) }) } @@ -577,8 +371,8 @@ fn job_daily_check( // Decay search hit counters (10% daily decay) ctx.log_line("decaying search counters"); match crate::counters::decay_all(0.9) { - Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)), - Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)), + Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)), + Err(e) => ctx.log_line(format!("counter decay failed: {}", e)), } // Compute graph health metrics for status display @@ -648,17 +442,14 @@ fn find_stale_sessions() -> Vec { let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; for f in files.filter_map(|e| e.ok()) { let path = f.path(); - if path.extension().map(|x| x == "jsonl").unwrap_or(false) { - if let Ok(meta) = path.metadata() { - // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) - if meta.len() < MIN_SESSION_BYTES { continue; } - if let Ok(mtime) = meta.modified() { - let age = now.duration_since(mtime).unwrap_or_default(); - if age.as_secs() >= SESSION_STALE_SECS { - stale.push(path); - } - } - } + if !path.extension().map(|x| x == "jsonl").unwrap_or(false) { continue; } + let Ok(meta) = path.metadata() else { continue }; + // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) + if meta.len() < MIN_SESSION_BYTES { continue; } + let Ok(mtime) = meta.modified() else { continue }; + let age = now.duration_since(mtime).unwrap_or_default(); + if age.as_secs() >= SESSION_STALE_SECS { + stale.push(path); } } } @@ -681,9 +472,8 @@ fn is_file_open(path: &Path) -> bool { let fd_dir = proc_entry.path().join("fd"); let Ok(fds) = fs::read_dir(&fd_dir) else { continue }; for fd in fds.filter_map(|e| e.ok()) { - if let Ok(link) = fs::read_link(fd.path()) { - if link == target { return true; } - } + let Ok(link) = fs::read_link(fd.path()) else { continue }; + if link == target { return true; } } } false @@ -925,7 +715,9 @@ pub fn run_daemon() -> Result<(), String> { // Check retry backoff before doing any work if let Some((next_retry, _)) = retry_backoff.get(&filename) { - if now < *next_retry { + if now >= *next_retry { + // Backoff expired, proceed + } else { backed_off += 1; continue; } @@ -1112,12 +904,12 @@ pub fn run_daemon() -> Result<(), String> { // Consolidation cycle: every 6 hours (wait for health check to cache metrics first) let gh = graph_health_sched.lock().unwrap().clone(); - if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL && gh.is_some() { + let Some(h) = gh.as_ref() else { continue }; + if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL { log_event("scheduler", "consolidation-trigger", &format!("{} (every 6h)", today)); // Use cached graph health plan (from consolidation_plan_quick). - let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above let plan = crate::neuro::ConsolidationPlan { counts: h.plan_counts.clone(), run_health: true, @@ -1166,7 +958,7 @@ pub fn run_daemon() -> Result<(), String> { // Phase 2: Link orphans (CPU-only, no LLM) let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today)) .retries(1) - .init(move |ctx| job_link_orphans(ctx)); + .init(job_link_orphans); if let Some(ref dep) = prev_agent { orphans.depend_on(dep); } @@ -1175,7 +967,7 @@ pub fn run_daemon() -> Result<(), String> { // Phase 3: Cap degree let mut cap = choir_sched.spawn(format!("c-cap:{}", today)) .retries(1) - .init(move |ctx| job_cap_degree(ctx)); + .init(job_cap_degree); cap.depend_on(&orphans); let cap = cap.run(); @@ -1183,14 +975,14 @@ pub fn run_daemon() -> Result<(), String> { let mut digest = choir_sched.spawn(format!("c-digest:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |ctx| job_digest(ctx)); + .init(job_digest); digest.depend_on(&cap); let digest = digest.run(); // Phase 5: Apply digest links let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today)) .retries(1) - .init(move |ctx| job_digest_links(ctx)); + .init(job_digest_links); digest_links.depend_on(&digest); let digest_links = digest_links.run(); @@ -1198,7 +990,7 @@ pub fn run_daemon() -> Result<(), String> { let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today)) .resource(&llm_sched) .retries(1) - .init(move |ctx| job_knowledge_loop(ctx)); + .init(job_knowledge_loop); knowledge.depend_on(&digest_links); *last_daily_sched.lock().unwrap() = Some(today); @@ -1648,11 +1440,9 @@ pub fn show_status() -> Result<(), String> { // Show recent failures for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { - if let Some(ref r) = t.result { - if let Some(ref err) = r.error { - let short = if err.len() > 80 { &err[..80] } else { err }; - eprintln!(" ✗ {}: {}", t.name, short); - } + if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) { + let short = if err.len() > 80 { &err[..80] } else { err }; + eprintln!(" ✗ {}: {}", t.name, short); } } eprintln!(); @@ -1926,21 +1716,26 @@ pub fn install_hook() -> Result<(), String> { /// 2. daemon.log started events (for completed/failed tasks) pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> { // Try running tasks first - if let Some(status_json) = send_rpc_pub("") { - if let Ok(status) = serde_json::from_str::(&status_json) { - if let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) { - for t in tasks { - let name = t.get("name").and_then(|n| n.as_str()).unwrap_or(""); - if name.contains(task_name) { - if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) { - return tail_file(lp, lines); - } - } - } - } + let Some(status_json) = send_rpc_pub("") else { + return search_log_fallback(task_name, lines); + }; + let Ok(status) = serde_json::from_str::(&status_json) else { + return search_log_fallback(task_name, lines); + }; + let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else { + return search_log_fallback(task_name, lines); + }; + for t in tasks { + let name = t.get("name").and_then(|n| n.as_str()).unwrap_or(""); + if !name.contains(task_name) { continue; } + if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) { + return tail_file(lp, lines); } } + search_log_fallback(task_name, lines) +} +fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> { // Fall back to searching daemon.log for the most recent started event with a log path let log = log_path(); if log.exists() {