centralize memory store interface in hippocampus/mod.rs

This commit is contained in:
Kent Overstreet 2026-04-13 17:44:33 -04:00
parent 063cf031d3
commit 5db00e083f
8 changed files with 899 additions and 723 deletions

View file

@ -6,136 +6,25 @@
#![allow(unused_variables)] // macro-generated args for no-param tools
use anyhow::{Context, Result};
use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;
use crate::hippocampus::{access, memory_rpc, StoreAccess};
use crate::store::Store;
// Re-export typed API from hippocampus for backward compatibility
pub use crate::hippocampus::{
memory_render, memory_write, memory_search, memory_link_set, memory_link_add,
memory_delete, memory_history, memory_weight_set, memory_rename, memory_supersede,
memory_query, memory_links,
journal_tail, journal_new, journal_update,
graph_topology, graph_health, graph_communities, graph_normalize_strengths,
graph_link_impact, graph_hubs, graph_trace,
set_store, socket_path,
};
// ── Store access ───────────────────────────────────────────────
/// Daemon's store (eager init) or client's fallback local store.
static STORE_ACCESS: OnceLock<Option<Arc<crate::Mutex<Store>>>> = OnceLock::new();
// Client's socket connection (thread-local for lock-free access).
thread_local! {
static SOCKET_CONN: RefCell<Option<SocketConn>> = const { RefCell::new(None) };
}
/// How we access the memory store.
enum StoreAccess {
Daemon(Arc<crate::Mutex<Store>>), // Direct store access
Client, // Socket to daemon (in thread-local)
None(String), // Error: couldn't get access
}
/// Set the global store handle. Call once at daemon startup (eager init).
pub fn set_store(store: Arc<crate::Mutex<Store>>) {
STORE_ACCESS.set(Some(store)).ok();
}
/// Get store access: daemon's store, socket, or local fallback.
fn access() -> StoreAccess {
// Daemon: already set via set_store()
if let Some(Some(store)) = STORE_ACCESS.get() {
return StoreAccess::Daemon(store.clone());
}
// Client: check if socket already cached in thread-local
let have_socket = SOCKET_CONN.with(|cell| cell.borrow().is_some());
if have_socket {
return StoreAccess::Client;
}
// No socket cached, try connecting
if let Ok(conn) = SocketConn::connect() {
SOCKET_CONN.with(|cell| *cell.borrow_mut() = Some(conn));
return StoreAccess::Client;
}
// Socket failed - try local store as fallback (cached in STORE_ACCESS)
let store_opt = STORE_ACCESS.get_or_init(|| {
Store::load().ok().map(|s| Arc::new(crate::Mutex::new(s)))
});
match store_opt {
Some(store) => StoreAccess::Daemon(store.clone()),
None => StoreAccess::None("could not connect to daemon or open store locally".into()),
}
}
pub fn socket_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/mcp.sock")
}
struct SocketConn {
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
writer: std::io::BufWriter<std::os::unix::net::UnixStream>,
next_id: u64,
}
impl SocketConn {
fn connect() -> Result<Self> {
use std::os::unix::net::UnixStream;
use std::io::{BufRead, BufReader, BufWriter, Write};
let path = socket_path();
let stream = UnixStream::connect(&path)?;
let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = BufWriter::new(stream);
// Initialize MCP connection
let init = serde_json::json!({"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "forward", "version": "0.1"}}});
writeln!(writer, "{}", init)?;
writer.flush()?;
let mut buf = String::new();
reader.read_line(&mut buf)?;
Ok(Self { reader, writer, next_id: 1 })
}
fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result<String> {
use std::io::{BufRead, Write};
self.next_id += 1;
let call = serde_json::json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call",
"params": {"name": tool_name, "arguments": args}});
writeln!(self.writer, "{}", call)?;
self.writer.flush()?;
let mut buf = String::new();
self.reader.read_line(&mut buf)?;
let resp: serde_json::Value = serde_json::from_str(&buf)?;
if let Some(err) = resp.get("error") {
anyhow::bail!("daemon error: {}", err);
}
let result = resp.get("result").cloned().unwrap_or(serde_json::json!({}));
let text = result.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
Ok(text.to_string())
}
}
/// Forward a tool call to the daemon via socket.
/// Only valid when access() returns Client.
fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
SOCKET_CONN.with(|cell| {
let mut conn = cell.borrow_mut();
let conn = conn.as_mut().expect("access() returned Client but SOCKET_CONN is None");
conn.call(tool_name, &args)
})
}
// ── Helpers ────────────────────────────────────────────────────
// ── Macro for generating tool wrappers ─────────────────────────
//
// memory_tool!(name, mut, arg1: [str], arg2: [Option<bool>])
// - mut/ref for store mutability
// - generates jsonargs_* (internal, JSON args) and public typed API
fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> {
args.get(name).and_then(|v| v.as_str()).context(format!("{} is required", name))
@ -153,12 +42,6 @@ async fn get_provenance(agent: &Option<std::sync::Arc<crate::agent::Agent>>) ->
}
}
// ── Macro for generating tool wrappers ─────────────────────────
//
// memory_tool!(name, mut, arg1: [str], arg2: [Option<bool>])
// - mut/ref for store mutability
// - generates jsonargs_* (internal, JSON args) and public typed API
macro_rules! memory_tool {
// ── Helper rules (must come first) ─────────────────────────────
@ -249,10 +132,10 @@ macro_rules! memory_tool {
// Call hippocampus with appropriate mutability
(@call mut, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
crate::hippocampus::$name(&mut $store, $prov $(, $arg)*)
crate::hippocampus::local::$name(&mut $store, $prov $(, $arg)*)
};
(@call ref, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
crate::hippocampus::$name(&$store, $prov $(, $arg)*)
crate::hippocampus::local::$name(&$store, $prov $(, $arg)*)
};
// ── Main rules ─────────────────────────────────────────────────
@ -284,29 +167,6 @@ macro_rules! memory_tool {
StoreAccess::None(err) => anyhow::bail!("{}", err),
}
}
pub async fn $name(agent: Option<&crate::agent::Agent> $($(, $arg: memory_tool!(@param_type $($typ)+))*)?) -> Result<$ret> {
let prov = match agent {
Some(a) => a.state.lock().await.provenance.clone(),
None => "manual".to_string(),
};
match access() {
StoreAccess::Daemon(arc) => {
#[allow(unused_mut)]
let mut store = arc.lock().await;
memory_tool!(@call $m, $name, store, &prov $($(, $arg)*)?)
}
StoreAccess::Client => {
#[allow(unused_mut)]
let mut map = serde_json::Map::new();
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
let json = memory_rpc(stringify!($name), serde_json::Value::Object(map))?;
memory_tool!(@deserialize $ret, json)
}
StoreAccess::None(err) => anyhow::bail!("{}", err),
}
}
}
};
}
@ -325,14 +185,14 @@ memory_tool!(memory_rename, mut, old_key: [str], new_key: [str]);
memory_tool!(memory_supersede, mut, old_key: [str], new_key: [str], reason: [Option<&str>]);
memory_tool!(memory_query, ref, query: [str], format: [Option<&str>]);
// Re-export LinkInfo for callers
pub use crate::hippocampus::LinkInfo;
// Re-export types and typed API from hippocampus
pub use crate::hippocampus::local::LinkInfo;
memory_tool!(memory_links, ref -> Vec<LinkInfo>, key: [str]);
// ── Journal tools ──────────────────────────────────────────────
pub use crate::hippocampus::JournalEntry;
pub use crate::hippocampus::local::JournalEntry;
memory_tool!(journal_tail, ref -> Vec<JournalEntry>, count: [Option<u64>], level: [Option<u64>], after: [Option<&str>]);
memory_tool!(journal_new, mut, name: [str], title: [str], body: [str], level: [Option<i64>]);