counters: wire redb search hits into daemon RPC

memory-search now records which nodes it finds via the daemon's
record-hits RPC endpoint. The daemon owns the redb database
exclusively, avoiding file locking between processes.

The rename agent reads hit counts to deprioritize nodes that are
actively being found by search — renaming them would break working
queries. Daily check decays counters by 10% so stale hits fade.

Also switched RPC command reading from fixed 256-byte buffer to
read_to_string for unbounded command sizes.
This commit is contained in:
ProofOfConcept 2026-03-11 00:13:58 -04:00
parent 884939b146
commit 7a3ce4f17d
4 changed files with 72 additions and 10 deletions

10
Cargo.lock generated
View file

@ -1890,6 +1890,7 @@ dependencies = [
"peg", "peg",
"ratatui", "ratatui",
"rayon", "rayon",
"redb",
"regex", "regex",
"rkyv", "rkyv",
"serde", "serde",
@ -2229,6 +2230,15 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430"
[[package]]
name = "redb"
version = "2.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eca1e9d98d5a7e9002d0013e18d5a9b000aee942eb134883a82f06ebffb6c01"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.18" version = "0.5.18"

View file

@ -482,6 +482,13 @@ fn job_daily_check(
ctx.log_line("checking health"); ctx.log_line("checking health");
let _report = crate::neuro::daily_check(&store); let _report = crate::neuro::daily_check(&store);
// Decay search hit counters (10% daily decay)
ctx.log_line("decaying search counters");
match crate::counters::decay_all(0.9) {
Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)),
Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)),
}
// Compute graph health metrics for status display // Compute graph health metrics for status display
ctx.log_line("computing graph health"); ctx.log_line("computing graph health");
let health = compute_graph_health(&store); let health = compute_graph_health(&store);
@ -1151,6 +1158,16 @@ pub fn rpc_consolidate() -> Result<(), String> {
} }
} }
/// Record search hits for the given keys (fire-and-forget from memory-search).
pub fn rpc_record_hits(keys: &[&str]) -> Result<(), String> {
if keys.is_empty() { return Ok(()); }
let cmd = format!("record-hits {}", keys.join("\t"));
match send_rpc(&cmd) {
Some(_) => Ok(()),
None => Err("Daemon not running.".into()),
}
}
pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> {
let cmd = format!("run-agent {} {}", agent, count); let cmd = format!("run-agent {} {}", agent, count);
match send_rpc(&cmd) { match send_rpc(&cmd) {
@ -1220,14 +1237,9 @@ fn status_socket_loop(
Ok((mut stream, _)) => { Ok((mut stream, _)) => {
// Read command from client (with short timeout) // Read command from client (with short timeout)
stream.set_read_timeout(Some(Duration::from_millis(100))).ok(); stream.set_read_timeout(Some(Duration::from_millis(100))).ok();
let mut cmd_buf = [0u8; 256]; let mut cmd = String::new();
let cmd = match stream.read(&mut cmd_buf) { let _ = stream.read_to_string(&mut cmd);
Ok(n) if n > 0 => std::str::from_utf8(&cmd_buf[..n]) let cmd = cmd.trim().to_string();
.unwrap_or("")
.trim()
.to_string(),
_ => String::new(),
};
match cmd.as_str() { match cmd.as_str() {
"consolidate" => { "consolidate" => {
@ -1235,6 +1247,29 @@ fn status_socket_loop(
let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n"); let _ = stream.write_all(b"{\"ok\":true,\"action\":\"consolidation scheduled\"}\n");
log_event("rpc", "consolidate", "triggered via socket"); log_event("rpc", "consolidate", "triggered via socket");
} }
cmd if cmd.starts_with("record-hits ") => {
let keys: Vec<&str> = cmd.strip_prefix("record-hits ")
.unwrap_or("")
.split('\t')
.filter(|k| !k.is_empty())
.collect();
if keys.is_empty() {
let _ = stream.write_all(b"{\"ok\":false,\"error\":\"no keys\"}\n");
} else {
let n = keys.len();
match crate::counters::record_search_hits(&keys) {
Ok(()) => {
let msg = format!("{{\"ok\":true,\"recorded\":{}}}\n", n);
let _ = stream.write_all(msg.as_bytes());
}
Err(e) => {
let msg = format!("{{\"ok\":false,\"error\":\"{}\"}}\n",
e.replace('"', "'"));
let _ = stream.write_all(msg.as_bytes());
}
}
}
}
cmd if cmd.starts_with("run-agent ") => { cmd if cmd.starts_with("run-agent ") => {
let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); let parts: Vec<&str> = cmd.splitn(3, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay"); let agent_type = parts.get(1).unwrap_or(&"replay");

View file

@ -275,8 +275,17 @@ pub fn format_rename_candidates(store: &Store, count: usize) -> (Vec<String>, St
.map(|(k, n)| (k.as_str(), n)) .map(|(k, n)| (k.as_str(), n))
.collect(); .collect();
// Least-recently visited first — naturally prioritizes unseen nodes // Deprioritize nodes actively found by search — renaming them would
candidates.sort_by_key(|(key, _)| store.last_visited(key, "rename")); // break working queries. Sort by: search hits (ascending), then
// least-recently visited. Nodes with many hits sink to the bottom.
let hit_counts = crate::counters::all_search_hits();
let hit_map: std::collections::HashMap<&str, u64> = hit_counts.iter()
.map(|(k, v)| (k.as_str(), *v))
.collect();
candidates.sort_by_key(|(key, _)| {
let hits = hit_map.get(key).copied().unwrap_or(0);
(hits, store.last_visited(key, "rename"))
});
candidates.truncate(count); candidates.truncate(count);
let keys: Vec<String> = candidates.iter().map(|(k, _)| k.to_string()).collect(); let keys: Vec<String> = candidates.iter().map(|(k, _)| k.to_string()).collect();

View file

@ -324,6 +324,14 @@ fn main() {
} }
print!("{}", result_output); print!("{}", result_output);
// Record search hits with daemon (fire-and-forget)
let hit_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
if debug { println!("[memory-search] recording {} search hits", hit_keys.len()); }
match poc_memory::agents::daemon::rpc_record_hits(&hit_keys) {
Ok(()) => { if debug { println!("[memory-search] hits recorded"); } }
Err(e) => { if debug { println!("[memory-search] hit recording failed: {}", e); } }
}
// Clean up stale state files (opportunistic) // Clean up stale state files (opportunistic)
cleanup_stale_files(&state_dir, Duration::from_secs(86400)); cleanup_stale_files(&state_dir, Duration::from_secs(86400));
} }