cleanup: fix clippy warnings in daemon.rs

- Remove dead code (job_split_one function never called)
- Fix needless borrows (ctx.log_line(&format! -> format!))
- Fix slice clone ([key.clone()] -> std::slice::from_ref(&key))
- Collapse nested if statements
- Fix unwrap after is_some check
- Remove redundant closures in task spawning

Reduces daemon.rs from 2030 to 1825 lines.
This commit is contained in:
Kent Overstreet 2026-03-21 19:42:03 -04:00
parent a0d8b52c9a
commit 3640de444b

View file

@ -126,14 +126,13 @@ fn job_targeted_agent(
let job_name = format!("c-{}-target({})", agent, key);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(&format!("targeting: {}", key));
let job = job_name.clone();
ctx.log_line(format!("targeting: {}", key));
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job, "progress", msg);
log_event(&job_name, "progress", msg);
};
super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &[key.clone()], 5, "daemon", &log, false,
&mut store, &agent, std::slice::from_ref(&key), 5, "daemon", &log, false,
)?;
ctx.log_line("done");
Ok(())
@ -154,7 +153,6 @@ fn job_consolidation_agent(
let agent = agent_type.to_string();
let batch = batch_size;
let job_name = format!("c-{}", agent);
let job_name2 = job_name.clone();
let in_flight = Arc::clone(in_flight);
run_job(ctx, &job_name, || {
ctx.log_line("loading store");
@ -166,7 +164,7 @@ fn job_consolidation_agent(
let graph = store.build_graph();
{
let mut locked = in_flight.lock().unwrap();
ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)",
ctx.log_line(format!("running agent: {} (batch={}, {} in-flight)",
agent, batch, locked.len()));
// Run the agent's query, filtering out in-flight nodes
@ -214,7 +212,7 @@ fn job_consolidation_agent(
let log = |msg: &str| {
ctx.log_line(msg);
log_event(&job_name2, "progress", msg);
log_event(&job_name, "progress", msg);
};
// Use run_one_agent_with_keys — we already selected seeds above,
// no need to re-run the query.
@ -246,7 +244,7 @@ fn job_rename_agent(
let mut store = crate::store::Store::load()?;
let batch = if batch_size == 0 { 10 } else { batch_size };
ctx.log_line(&format!("running rename agent (batch={})", batch));
ctx.log_line(format!("running rename agent (batch={})", batch));
let log = |msg: &str| ctx.log_line(msg);
let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate", &log, false)?;
@ -268,25 +266,25 @@ fn job_rename_agent(
let resolved = match store.resolve_key(old_key) {
Ok(k) => k,
Err(e) => {
ctx.log_line(&format!("skip: {}{}: {}", old_key, new_key, e));
ctx.log_line(format!("skip: {}{}: {}", old_key, new_key, e));
skipped += 1;
continue;
}
};
if store.nodes.contains_key(new_key) {
ctx.log_line(&format!("skip: {} already exists", new_key));
ctx.log_line(format!("skip: {} already exists", new_key));
skipped += 1;
continue;
}
match store.rename_node(&resolved, new_key) {
Ok(()) => {
ctx.log_line(&format!("renamed: {}{}", resolved, new_key));
ctx.log_line(format!("renamed: {}{}", resolved, new_key));
applied += 1;
}
Err(e) => {
ctx.log_line(&format!("error: {}{}: {}", resolved, new_key, e));
ctx.log_line(format!("error: {}{}: {}", resolved, new_key, e));
skipped += 1;
}
}
@ -296,211 +294,7 @@ fn job_rename_agent(
store.save()?;
}
ctx.log_line(&format!("done: {} applied, {} skipped", applied, skipped));
Ok(())
})
}
/// Run the split agent: two-phase decomposition of large nodes.
///
/// Phase 1: Send node + neighbors to LLM, get back a JSON split plan
/// (child keys, descriptions, section hints).
/// Phase 2: For each child, send parent content + child description to LLM,
/// get back the extracted/reorganized content for that child.
///
/// This handles arbitrarily large nodes because the output of each phase 2
/// call is proportional to one child, not the whole parent.
/// Split a single node by key. Called as an independent task so multiple
/// splits can run in parallel. Each task loads the store fresh, checks the
/// node still exists and hasn't been split, does the LLM work, then saves.
fn job_split_one(
ctx: &ExecutionContext,
parent_key: String,
) -> Result<(), TaskError> {
run_job(ctx, "c-split", || {
ctx.log_line(&format!("loading store for {}", parent_key));
let mut store = crate::store::Store::load()?;
// Check node still exists and hasn't been deleted/split already
let content_len = match store.nodes.get(parent_key.as_str()) {
Some(n) if !n.deleted => n.content.len(),
_ => {
ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key));
return Ok(());
}
};
ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len));
// Phase 1: get split plan
let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?;
ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len()));
let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?;
let plan = match super::llm::parse_json_response(&plan_response) {
Ok(v) => v,
Err(e) => {
ctx.log_line(&format!("phase 1 parse error: {}", e));
return Ok(());
}
};
let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or("");
if action == "keep" {
let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!("keep: {} ({})", parent_key, reason));
return Ok(());
}
if action != "split" {
ctx.log_line(&format!("unexpected action: {}", action));
return Ok(());
}
let children_plan = match plan.get("children").and_then(|v| v.as_array()) {
Some(c) if c.len() >= 2 => c,
_ => {
ctx.log_line("plan has fewer than 2 children, skipping");
return Ok(());
}
};
ctx.log_line(&format!("phase 1: {} children planned", children_plan.len()));
for child in children_plan {
let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?");
let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!(" planned: {}{}", key, desc));
}
// Phase 2: extract content for each child
let mut children: Vec<(String, String)> = Vec::new();
// Collect neighbor assignments from plan: child_key -> [neighbor_keys]
let mut neighbor_map: HashMap<String, Vec<String>> = HashMap::new();
for child_plan in children_plan {
let child_key = match child_plan.get("key").and_then(|v| v.as_str()) {
Some(k) => k.to_string(),
None => continue,
};
let child_desc = child_plan.get("description")
.and_then(|v| v.as_str()).unwrap_or("");
let child_sections = child_plan.get("sections")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", "))
.unwrap_or_default();
let child_neighbors: Vec<String> = child_plan.get("neighbors")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect())
.unwrap_or_default();
neighbor_map.insert(child_key.clone(), child_neighbors);
ctx.log_line(&format!("phase 2: extracting {}", child_key));
let extract_prompt = super::prompts::split_extract_prompt(
&store, &parent_key, &child_key, child_desc, &child_sections)?;
ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len()));
let content = match super::llm::call_sonnet("split-extract", &extract_prompt) {
Ok(c) => c,
Err(e) => {
ctx.log_line(&format!(" extract error: {}", e));
continue;
}
};
ctx.log_line(&format!(" extracted: {} chars", content.len()));
children.push((child_key, content));
}
if children.len() < 2 {
ctx.log_line(&format!("only {} children extracted, skipping", children.len()));
return Ok(());
}
// Reload store before mutations — another split may have saved meanwhile
store = crate::store::Store::load()?;
// Re-check parent still exists after reload
if !store.nodes.contains_key(parent_key.as_str()) ||
store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) {
ctx.log_line(&format!("skip: {} was split by another task", parent_key));
return Ok(());
}
// Collect parent's edges before modifications
let parent_edges: Vec<_> = store.relations.iter()
.filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key))
.cloned()
.collect();
// Create child nodes
let mut child_uuids: Vec<([u8; 16], String)> = Vec::new();
for (child_key, content) in &children {
if store.nodes.contains_key(child_key.as_str()) {
ctx.log_line(&format!(" skip: {} already exists", child_key));
continue;
}
store.upsert_provenance(child_key, content,
"consolidate:write")?;
let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid;
child_uuids.push((uuid, child_key.clone()));
ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len()));
}
// Inherit edges using agent's neighbor assignments from the plan
for (child_uuid, child_key) in &child_uuids {
let neighbors = match neighbor_map.get(child_key) {
Some(n) => n,
None => continue,
};
for neighbor_key in neighbors {
// Find the parent edge for this neighbor to inherit its strength
let parent_edge = parent_edges.iter().find(|r| {
r.source_key == *neighbor_key || r.target_key == *neighbor_key
});
let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3);
let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) {
Some(n) => n.uuid,
None => continue,
};
let rel = crate::store::new_relation(
*child_uuid, neighbor_uuid,
crate::store::RelationType::Auto, strength,
child_key, neighbor_key,
);
store.add_relation(rel).ok();
}
}
// Link siblings
for i in 0..child_uuids.len() {
for j in (i+1)..child_uuids.len() {
let rel = crate::store::new_relation(
child_uuids[i].0, child_uuids[j].0,
crate::store::RelationType::Auto, 0.5,
&child_uuids[i].1, &child_uuids[j].1,
);
store.add_relation(rel).ok();
}
}
// Tombstone parent
if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) {
parent.deleted = true;
parent.version += 1;
let tombstone = parent.clone();
store.append_nodes(std::slice::from_ref(&tombstone)).ok();
}
store.nodes.remove(parent_key.as_str());
ctx.log_line(&format!("split complete: {}{} children", parent_key, child_uuids.len()));
store.save()?;
ctx.log_line(format!("done: {} applied, {} skipped", applied, skipped));
Ok(())
})
}
@ -512,7 +306,7 @@ fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> {
let mut store = crate::store::Store::load()?;
ctx.log_line("linking orphans");
let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15);
ctx.log_line(&format!("{} orphans, {} links added", orphans, added));
ctx.log_line(format!("{} orphans, {} links added", orphans, added));
Ok(())
})
}
@ -526,7 +320,7 @@ fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> {
match store.cap_degree(50) {
Ok((hubs, pruned)) => {
store.save()?;
ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned));
ctx.log_line(format!("{} hubs capped, {} edges pruned", hubs, pruned));
Ok(())
}
Err(e) => Err(e),
@ -543,7 +337,7 @@ fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> {
let links = super::digest::parse_all_digest_links(&store);
let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links);
store.save()?;
ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
ctx.log_line(format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks));
Ok(())
})
}
@ -577,8 +371,8 @@ fn job_daily_check(
// Decay search hit counters (10% daily decay)
ctx.log_line("decaying search counters");
match crate::counters::decay_all(0.9) {
Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)),
Ok(removed) => ctx.log_line(format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(format!("counter decay failed: {}", e)),
}
// Compute graph health metrics for status display
@ -648,17 +442,14 @@ fn find_stale_sessions() -> Vec<PathBuf> {
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
for f in files.filter_map(|e| e.ok()) {
let path = f.path();
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
if let Ok(meta) = path.metadata() {
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
if let Ok(mtime) = meta.modified() {
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
if !path.extension().map(|x| x == "jsonl").unwrap_or(false) { continue; }
let Ok(meta) = path.metadata() else { continue };
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
let Ok(mtime) = meta.modified() else { continue };
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
@ -681,9 +472,8 @@ fn is_file_open(path: &Path) -> bool {
let fd_dir = proc_entry.path().join("fd");
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
for fd in fds.filter_map(|e| e.ok()) {
if let Ok(link) = fs::read_link(fd.path()) {
if link == target { return true; }
}
let Ok(link) = fs::read_link(fd.path()) else { continue };
if link == target { return true; }
}
}
false
@ -925,7 +715,9 @@ pub fn run_daemon() -> Result<(), String> {
// Check retry backoff before doing any work
if let Some((next_retry, _)) = retry_backoff.get(&filename) {
if now < *next_retry {
if now >= *next_retry {
// Backoff expired, proceed
} else {
backed_off += 1;
continue;
}
@ -1112,12 +904,12 @@ pub fn run_daemon() -> Result<(), String> {
// Consolidation cycle: every 6 hours (wait for health check to cache metrics first)
let gh = graph_health_sched.lock().unwrap().clone();
if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL && gh.is_some() {
let Some(h) = gh.as_ref() else { continue };
if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL {
log_event("scheduler", "consolidation-trigger",
&format!("{} (every 6h)", today));
// Use cached graph health plan (from consolidation_plan_quick).
let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above
let plan = crate::neuro::ConsolidationPlan {
counts: h.plan_counts.clone(),
run_health: true,
@ -1166,7 +958,7 @@ pub fn run_daemon() -> Result<(), String> {
// Phase 2: Link orphans (CPU-only, no LLM)
let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today))
.retries(1)
.init(move |ctx| job_link_orphans(ctx));
.init(job_link_orphans);
if let Some(ref dep) = prev_agent {
orphans.depend_on(dep);
}
@ -1175,7 +967,7 @@ pub fn run_daemon() -> Result<(), String> {
// Phase 3: Cap degree
let mut cap = choir_sched.spawn(format!("c-cap:{}", today))
.retries(1)
.init(move |ctx| job_cap_degree(ctx));
.init(job_cap_degree);
cap.depend_on(&orphans);
let cap = cap.run();
@ -1183,14 +975,14 @@ pub fn run_daemon() -> Result<(), String> {
let mut digest = choir_sched.spawn(format!("c-digest:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |ctx| job_digest(ctx));
.init(job_digest);
digest.depend_on(&cap);
let digest = digest.run();
// Phase 5: Apply digest links
let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today))
.retries(1)
.init(move |ctx| job_digest_links(ctx));
.init(job_digest_links);
digest_links.depend_on(&digest);
let digest_links = digest_links.run();
@ -1198,7 +990,7 @@ pub fn run_daemon() -> Result<(), String> {
let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today))
.resource(&llm_sched)
.retries(1)
.init(move |ctx| job_knowledge_loop(ctx));
.init(job_knowledge_loop);
knowledge.depend_on(&digest_links);
*last_daily_sched.lock().unwrap() = Some(today);
@ -1648,11 +1440,9 @@ pub fn show_status() -> Result<(), String> {
// Show recent failures
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
if let Some(ref r) = t.result {
if let Some(ref err) = r.error {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
if let Some(ref err) = t.result.as_ref().and_then(|r| r.error.as_ref()) {
let short = if err.len() > 80 { &err[..80] } else { err };
eprintln!("{}: {}", t.name, short);
}
}
eprintln!();
@ -1926,21 +1716,26 @@ pub fn install_hook() -> Result<(), String> {
/// 2. daemon.log started events (for completed/failed tasks)
pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> {
// Try running tasks first
if let Some(status_json) = send_rpc_pub("") {
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) {
if let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) {
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if name.contains(task_name) {
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
}
}
let Some(status_json) = send_rpc_pub("") else {
return search_log_fallback(task_name, lines);
};
let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_json) else {
return search_log_fallback(task_name, lines);
};
let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) else {
return search_log_fallback(task_name, lines);
};
for t in tasks {
let name = t.get("name").and_then(|n| n.as_str()).unwrap_or("");
if !name.contains(task_name) { continue; }
if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) {
return tail_file(lp, lines);
}
}
search_log_fallback(task_name, lines)
}
fn search_log_fallback(task_name: &str, lines: usize) -> Result<(), String> {
// Fall back to searching daemon.log for the most recent started event with a log path
let log = log_path();
if log.exists() {