First use case: search hit tracking for rename protection. Nodes that memory-search actively finds shouldn't be renamed. The counters module provides increment/read/decay operations backed by redb (pure Rust, ACID, no C deps). Next step: wire into the poc-memory daemon via RPC so the daemon owns the DB exclusively and memory-search sends hits via RPC. Also reverts the JSONL search-hits approach in favor of this.
116 lines
3.9 KiB
Rust
116 lines
3.9 KiB
Rust
// counters.rs — persistent counters backed by redb
|
|
//
|
|
// Tracks search hits, visit counts, and other per-key metrics that
|
|
// need fast increment/read without loading the full capnp store.
|
|
//
|
|
// Tables:
|
|
// search_hits: key → u64 (how often memory-search found this node)
|
|
// last_hit_ts: key → i64 (unix timestamp of last search hit)
|
|
|
|
use redb::{Database, ReadableTable, TableDefinition};
|
|
use std::path::PathBuf;
|
|
|
|
const SEARCH_HITS: TableDefinition<&str, u64> = TableDefinition::new("search_hits");
|
|
const LAST_HIT_TS: TableDefinition<&str, i64> = TableDefinition::new("last_hit_ts");
|
|
|
|
fn db_path() -> PathBuf {
|
|
crate::config::get().data_dir.join("counters.redb")
|
|
}
|
|
|
|
/// Open (or create) the counters database.
|
|
pub fn open() -> Result<Database, String> {
|
|
Database::create(db_path()).map_err(|e| format!("open counters db: {}", e))
|
|
}
|
|
|
|
/// Increment search hit count for a set of keys.
|
|
pub fn record_search_hits(keys: &[&str]) -> Result<(), String> {
|
|
let db = open()?;
|
|
let ts = chrono::Utc::now().timestamp();
|
|
let txn = db.begin_write().map_err(|e| format!("begin write: {}", e))?;
|
|
{
|
|
let mut hits = txn.open_table(SEARCH_HITS).map_err(|e| format!("open table: {}", e))?;
|
|
let mut ts_table = txn.open_table(LAST_HIT_TS).map_err(|e| format!("open table: {}", e))?;
|
|
for key in keys {
|
|
let count = hits.get(*key).map_err(|e| format!("get: {}", e))?
|
|
.map(|v| v.value())
|
|
.unwrap_or(0);
|
|
hits.insert(*key, count + 1).map_err(|e| format!("insert: {}", e))?;
|
|
ts_table.insert(*key, ts).map_err(|e| format!("insert ts: {}", e))?;
|
|
}
|
|
}
|
|
txn.commit().map_err(|e| format!("commit: {}", e))?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Get search hit count for a key.
|
|
pub fn search_hit_count(key: &str) -> u64 {
|
|
let db = match open() {
|
|
Ok(db) => db,
|
|
Err(_) => return 0,
|
|
};
|
|
let txn = match db.begin_read() {
|
|
Ok(t) => t,
|
|
Err(_) => return 0,
|
|
};
|
|
let table = match txn.open_table(SEARCH_HITS) {
|
|
Ok(t) => t,
|
|
Err(_) => return 0,
|
|
};
|
|
table.get(key).ok().flatten().map(|v| v.value()).unwrap_or(0)
|
|
}
|
|
|
|
/// Get all search hit counts (for rename agent).
|
|
/// Returns keys sorted by count descending.
|
|
pub fn all_search_hits() -> Vec<(String, u64)> {
|
|
let db = match open() {
|
|
Ok(db) => db,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
let txn = match db.begin_read() {
|
|
Ok(t) => t,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
let table = match txn.open_table(SEARCH_HITS) {
|
|
Ok(t) => t,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
let mut results: Vec<(String, u64)> = match table.iter() {
|
|
Ok(iter) => iter
|
|
.flatten()
|
|
.map(|(k, v)| (k.value().to_string(), v.value()))
|
|
.collect(),
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
results.sort_by(|a, b| b.1.cmp(&a.1));
|
|
results
|
|
}
|
|
|
|
/// Decay all counters by a factor (e.g. 0.9 = 10% decay).
|
|
/// Removes entries that drop to zero.
|
|
pub fn decay_all(factor: f64) -> Result<usize, String> {
|
|
let db = open()?;
|
|
let txn = db.begin_write().map_err(|e| format!("begin write: {}", e))?;
|
|
let mut removed = 0;
|
|
{
|
|
let mut table = txn.open_table(SEARCH_HITS).map_err(|e| format!("open table: {}", e))?;
|
|
|
|
// Collect keys first to avoid borrow conflict
|
|
let entries: Vec<(String, u64)> = table.iter()
|
|
.map_err(|e| format!("iter: {}", e))?
|
|
.flatten()
|
|
.map(|(k, v)| (k.value().to_string(), v.value()))
|
|
.collect();
|
|
|
|
for (key, count) in entries {
|
|
let new_count = (count as f64 * factor) as u64;
|
|
if new_count == 0 {
|
|
table.remove(key.as_str()).ok();
|
|
removed += 1;
|
|
} else {
|
|
table.insert(key.as_str(), new_count).ok();
|
|
}
|
|
}
|
|
}
|
|
txn.commit().map_err(|e| format!("commit: {}", e))?;
|
|
Ok(removed)
|
|
}
|