From 7a3ce4f17d038a2bdd5cb927d5dabec354bdef6c Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Wed, 11 Mar 2026 00:13:58 -0400 Subject: [PATCH] counters: wire redb search hits into daemon RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit memory-search now records which nodes it finds via the daemon's record-hits RPC endpoint. The daemon owns the redb database exclusively, avoiding file locking between processes. The rename agent reads hit counts to deprioritize nodes that are actively being found by search — renaming them would break working queries. Daily check decays counters by 10% so stale hits fade. Also switched RPC command reading from fixed 256-byte buffer to read_to_string for unbounded command sizes. --- Cargo.lock | 10 ++++++ poc-memory/src/agents/daemon.rs | 51 ++++++++++++++++++++++++----- poc-memory/src/agents/prompts.rs | 13 ++++++-- poc-memory/src/bin/memory-search.rs | 8 +++++ 4 files changed, 72 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c6e228a..8555f26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,6 +1890,7 @@ dependencies = [ "peg", "ratatui", "rayon", + "redb", "regex", "rkyv", "serde", @@ -2229,6 +2230,15 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" +[[package]] +name = "redb" +version = "2.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eca1e9d98d5a7e9002d0013e18d5a9b000aee942eb134883a82f06ebffb6c01" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index ad63e45..5d2a96f 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -482,6 +482,13 @@ fn job_daily_check( ctx.log_line("checking health"); let _report = crate::neuro::daily_check(&store); + // Decay search hit counters (10% daily decay) + ctx.log_line("decaying search counters"); + match crate::counters::decay_all(0.9) { + Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)), + Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)), + } + // Compute graph health metrics for status display ctx.log_line("computing graph health"); let health = compute_graph_health(&store); @@ -1151,6 +1158,16 @@ pub fn rpc_consolidate() -> Result<(), String> { } } +/// Record search hits for the given keys (fire-and-forget from memory-search). +pub fn rpc_record_hits(keys: &[&str]) -> Result<(), String> { + if keys.is_empty() { return Ok(()); } + let cmd = format!("record-hits {}", keys.join("\t")); + match send_rpc(&cmd) { + Some(_) => Ok(()), + None => Err("Daemon not running.".into()), + } +} + pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { let cmd = format!("run-agent {} {}", agent, count); match send_rpc(&cmd) { @@ -1220,14 +1237,9 @@ fn status_socket_loop( Ok((mut stream, _)) => { // Read command from client (with short timeout) stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); - let mut cmd_buf = [0u8; 256]; - let cmd = match stream.read(&mut cmd_buf) { - Ok(n) if n > 0 => std::str::from_utf8(&cmd_buf[..n]) - .unwrap_or("") - .trim() - .to_string(), - _ => String::new(), - }; + let mut cmd = String::new(); + let _ = stream.read_to_string(&mut cmd); + let cmd = cmd.trim().to_string(); match cmd.as_str() { "consolidate" => { @@ -1235,6 +1247,29 @@ fn status_socket_loop( let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n"); log_event("rpc", "consolidate", "triggered via socket"); } + cmd if cmd.starts_with("record-hits ") => { + let keys: Vec<&str> = cmd.strip_prefix("record-hits ") + .unwrap_or("") + .split('\t') + .filter(|k| !k.is_empty()) + .collect(); + if keys.is_empty() { + let _ = stream.write_all(b"{\"ok\":false,\"error\":\"no keys\"}\n"); + } else { + let n = keys.len(); + match crate::counters::record_search_hits(&keys) { + Ok(()) => { + let msg = format!("{{\"ok\":true,\"recorded\":{}}}\n", n); + let _ = stream.write_all(msg.as_bytes()); + } + Err(e) => { + let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n", + e.replace('"', "'")); + let _ = stream.write_all(msg.as_bytes()); + } + } + } + } cmd if cmd.starts_with("run-agent ") => { let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); let agent_type = parts.get(1).unwrap_or(&"replay"); diff --git a/poc-memory/src/agents/prompts.rs b/poc-memory/src/agents/prompts.rs index 726db1b..b43fc4a 100644 --- a/poc-memory/src/agents/prompts.rs +++ b/poc-memory/src/agents/prompts.rs @@ -275,8 +275,17 @@ pub fn format_rename_candidates(store: &Store, count: usize) -> (Vec, St .map(|(k, n)| (k.as_str(), n)) .collect(); - // Least-recently visited first — naturally prioritizes unseen nodes - candidates.sort_by_key(|(key, _)| store.last_visited(key, "rename")); + // Deprioritize nodes actively found by search — renaming them would + // break working queries. Sort by: search hits (ascending), then + // least-recently visited. Nodes with many hits sink to the bottom. + let hit_counts = crate::counters::all_search_hits(); + let hit_map: std::collections::HashMap<&str, u64> = hit_counts.iter() + .map(|(k, v)| (k.as_str(), *v)) + .collect(); + candidates.sort_by_key(|(key, _)| { + let hits = hit_map.get(key).copied().unwrap_or(0); + (hits, store.last_visited(key, "rename")) + }); candidates.truncate(count); let keys: Vec = candidates.iter().map(|(k, _)| k.to_string()).collect(); diff --git a/poc-memory/src/bin/memory-search.rs b/poc-memory/src/bin/memory-search.rs index 5078e0b..74c2eb2 100644 --- a/poc-memory/src/bin/memory-search.rs +++ b/poc-memory/src/bin/memory-search.rs @@ -324,6 +324,14 @@ fn main() { } print!("{}", result_output); + // Record search hits with daemon (fire-and-forget) + let hit_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect(); + if debug { println!("[memory-search] recording {} search hits", hit_keys.len()); } + match poc_memory::agents::daemon::rpc_record_hits(&hit_keys) { + Ok(()) => { if debug { println!("[memory-search] hits recorded"); } } + Err(e) => { if debug { println!("[memory-search] hit recording failed: {}", e); } } + } + // Clean up stale state files (opportunistic) cleanup_stale_files(&state_dir, Duration::from_secs(86400)); }