consciousness/src/agent/tools/memory.rs
Kent Overstreet 598f0112a4 memory_links: return typed Vec<LinkInfo> with node weights
- hippocampus::memory_links now returns Vec<LinkInfo> with key,
  link_strength, and node_weight for each neighbor
- Unified memory_tool! macro: mut/ref as token, single main rule
- All tools use serde serialize/deserialize for RPC consistency
- jsonargs handlers now work in client mode (RPC to daemon)
- cli/graph.rs formats LinkInfo for display

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-13 15:12:06 -04:00

513 lines
22 KiB
Rust

// tools/memory.rs — Native memory graph operations
//
// Daemon: calls set_store() at startup for direct store access.
// Clients: lazy init tries socket, falls back to local store.
#![allow(unused_variables)] // macro-generated args for no-param tools
use anyhow::{Context, Result};
use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use crate::store::Store;
// ── Store access ───────────────────────────────────────────────
/// Daemon's store (eager init) or client's fallback local store.
static STORE_ACCESS: OnceLock<Option<Arc<crate::Mutex<Store>>>> = OnceLock::new();
// Client's socket connection (thread-local for lock-free access).
thread_local! {
static SOCKET_CONN: RefCell<Option<SocketConn>> = const { RefCell::new(None) };
}
/// How we access the memory store.
enum StoreAccess {
Daemon(Arc<crate::Mutex<Store>>), // Direct store access
Client, // Socket to daemon (in thread-local)
None(String), // Error: couldn't get access
}
/// Set the global store handle. Call once at daemon startup (eager init).
pub fn set_store(store: Arc<crate::Mutex<Store>>) {
STORE_ACCESS.set(Some(store)).ok();
}
/// Get store access: daemon's store, socket, or local fallback.
fn access() -> StoreAccess {
// Daemon: already set via set_store()
if let Some(Some(store)) = STORE_ACCESS.get() {
return StoreAccess::Daemon(store.clone());
}
// Client: check if socket already cached in thread-local
let have_socket = SOCKET_CONN.with(|cell| cell.borrow().is_some());
if have_socket {
return StoreAccess::Client;
}
// No socket cached, try connecting
if let Ok(conn) = SocketConn::connect() {
SOCKET_CONN.with(|cell| *cell.borrow_mut() = Some(conn));
return StoreAccess::Client;
}
// Socket failed - try local store as fallback (cached in STORE_ACCESS)
let store_opt = STORE_ACCESS.get_or_init(|| {
Store::load().ok().map(|s| Arc::new(crate::Mutex::new(s)))
});
match store_opt {
Some(store) => StoreAccess::Daemon(store.clone()),
None => StoreAccess::None("could not connect to daemon or open store locally".into()),
}
}
pub fn socket_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
.join(".consciousness/mcp.sock")
}
struct SocketConn {
reader: std::io::BufReader<std::os::unix::net::UnixStream>,
writer: std::io::BufWriter<std::os::unix::net::UnixStream>,
next_id: u64,
}
impl SocketConn {
fn connect() -> Result<Self> {
use std::os::unix::net::UnixStream;
use std::io::{BufRead, BufReader, BufWriter, Write};
let path = socket_path();
let stream = UnixStream::connect(&path)?;
let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = BufWriter::new(stream);
// Initialize MCP connection
let init = serde_json::json!({"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "forward", "version": "0.1"}}});
writeln!(writer, "{}", init)?;
writer.flush()?;
let mut buf = String::new();
reader.read_line(&mut buf)?;
Ok(Self { reader, writer, next_id: 1 })
}
fn call(&mut self, tool_name: &str, args: &serde_json::Value) -> Result<String> {
use std::io::{BufRead, Write};
self.next_id += 1;
let call = serde_json::json!({"jsonrpc": "2.0", "id": self.next_id, "method": "tools/call",
"params": {"name": tool_name, "arguments": args}});
writeln!(self.writer, "{}", call)?;
self.writer.flush()?;
let mut buf = String::new();
self.reader.read_line(&mut buf)?;
let resp: serde_json::Value = serde_json::from_str(&buf)?;
if let Some(err) = resp.get("error") {
anyhow::bail!("daemon error: {}", err);
}
let result = resp.get("result").cloned().unwrap_or(serde_json::json!({}));
let text = result.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
Ok(text.to_string())
}
}
/// Forward a tool call to the daemon via socket.
/// Only valid when access() returns Client.
fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
SOCKET_CONN.with(|cell| {
let mut conn = cell.borrow_mut();
let conn = conn.as_mut().expect("access() returned Client but SOCKET_CONN is None");
conn.call(tool_name, &args)
})
}
// ── Helpers ────────────────────────────────────────────────────
fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> {
args.get(name).and_then(|v| v.as_str()).context(format!("{} is required", name))
}
fn get_f64(args: &serde_json::Value, name: &str) -> Result<f64> {
args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name))
}
/// Get provenance from agent state, or "manual".
async fn get_provenance(agent: &Option<std::sync::Arc<crate::agent::Agent>>) -> String {
match agent {
Some(a) => a.state.lock().await.provenance.clone(),
None => "manual".to_string(),
}
}
// ── Macro for generating tool wrappers ─────────────────────────
//
// memory_tool!(name, mut, arg1: [str], arg2: [Option<bool>])
// - mut/ref for store mutability
// - generates jsonargs_* (internal, JSON args) and public typed API
macro_rules! memory_tool {
// ── Helper rules (must come first) ─────────────────────────────
// Extract from JSON
(@extract $args:ident, $name:ident, str) => {
get_str($args, stringify!($name))?
};
(@extract $args:ident, $name:ident, f32) => {
get_f64($args, stringify!($name))? as f32
};
(@extract $args:ident, $name:ident, Vec<String>) => {
$args.get(stringify!($name))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect::<Vec<_>>())
.unwrap_or_default()
};
(@extract $args:ident, $name:ident, Option<&str>) => {
$args.get(stringify!($name)).and_then(|v| v.as_str())
};
(@extract $args:ident, $name:ident, Option<bool>) => {
$args.get(stringify!($name)).and_then(|v| v.as_bool())
};
(@extract $args:ident, $name:ident, Option<u64>) => {
$args.get(stringify!($name)).and_then(|v| v.as_u64())
};
(@extract $args:ident, $name:ident, Option<i64>) => {
$args.get(stringify!($name)).and_then(|v| v.as_i64())
};
(@extract $args:ident, $name:ident, Option<usize>) => {
$args.get(stringify!($name)).and_then(|v| v.as_u64()).map(|v| v as usize)
};
(@extract $args:ident, $name:ident, Option<u32>) => {
$args.get(stringify!($name)).and_then(|v| v.as_u64()).map(|v| v as u32)
};
(@extract $args:ident, $name:ident, Option<f64>) => {
$args.get(stringify!($name)).and_then(|v| v.as_f64())
};
// Parameter types for function signatures
(@param_type str) => { &str };
(@param_type f32) => { f32 };
(@param_type Vec<String>) => { Vec<String> };
(@param_type Option<&str>) => { Option<&str> };
(@param_type Option<bool>) => { Option<bool> };
(@param_type Option<u64>) => { Option<u64> };
(@param_type Option<i64>) => { Option<i64> };
(@param_type Option<usize>) => { Option<usize> };
(@param_type Option<u32>) => { Option<u32> };
(@param_type Option<f64>) => { Option<f64> };
// Serialize result for jsonargs
(@serialize $t:ty, $result:expr) => { serde_json::to_string(&$result)? };
// Deserialize RPC response
(@deserialize $t:ty, $json:expr) => { serde_json::from_str(&$json).map_err(|e| anyhow::anyhow!("{}", e)) };
// Serialize to JSON for RPC
(@insert_json $map:ident, $name:ident, str) => {
$map.insert(stringify!($name).into(), serde_json::json!($name));
};
(@insert_json $map:ident, $name:ident, f32) => {
$map.insert(stringify!($name).into(), serde_json::json!($name));
};
(@insert_json $map:ident, $name:ident, Vec<String>) => {
$map.insert(stringify!($name).into(), serde_json::json!($name));
};
(@insert_json $map:ident, $name:ident, Option<&str>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<bool>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<u64>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<i64>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<usize>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<u32>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
(@insert_json $map:ident, $name:ident, Option<f64>) => {
if let Some(v) = $name { $map.insert(stringify!($name).into(), serde_json::json!(v)); }
};
// Call hippocampus with appropriate mutability
(@call mut, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
crate::hippocampus::$name(&mut $store, $prov $(, $arg)*)
};
(@call ref, $name:ident, $store:ident, $prov:expr $(, $arg:expr)*) => {
crate::hippocampus::$name(&$store, $prov $(, $arg)*)
};
// ── Main rules ─────────────────────────────────────────────────
// Shorthand: mut/ref without return type defaults to String
($name:ident, $m:ident $(, $($arg:ident : [$($typ:tt)+]),* $(,)?)?) => {
memory_tool!($name, $m -> String $(, $($arg : [$($typ)+]),*)?);
};
// Full form with return type
($name:ident, $m:ident -> $ret:ty $(, $($arg:ident : [$($typ:tt)+]),* $(,)?)?) => {
paste::paste! {
async fn [<jsonargs_ $name>](agent: &Option<std::sync::Arc<crate::agent::Agent>>, args: &serde_json::Value) -> Result<String> {
$($(let $arg = memory_tool!(@extract args, $arg, $($typ)+);)*)?
let prov = get_provenance(agent).await;
match access() {
StoreAccess::Daemon(arc) => {
#[allow(unused_mut)]
let mut store = arc.lock().await;
let result: $ret = memory_tool!(@call $m, $name, store, &prov $($(, $arg)*)?)?;
Ok(memory_tool!(@serialize $ret, result))
}
StoreAccess::Client => {
#[allow(unused_mut)]
let mut map = serde_json::Map::new();
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
memory_rpc(stringify!($name), serde_json::Value::Object(map))
}
StoreAccess::None(err) => anyhow::bail!("{}", err),
}
}
pub async fn $name(agent: Option<&crate::agent::Agent> $($(, $arg: memory_tool!(@param_type $($typ)+))*)?) -> Result<$ret> {
let prov = match agent {
Some(a) => a.state.lock().await.provenance.clone(),
None => "manual".to_string(),
};
match access() {
StoreAccess::Daemon(arc) => {
#[allow(unused_mut)]
let mut store = arc.lock().await;
memory_tool!(@call $m, $name, store, &prov $($(, $arg)*)?)
}
StoreAccess::Client => {
#[allow(unused_mut)]
let mut map = serde_json::Map::new();
$($(memory_tool!(@insert_json map, $arg, $($typ)+);)*)?
let json = memory_rpc(stringify!($name), serde_json::Value::Object(map))?;
memory_tool!(@deserialize $ret, json)
}
StoreAccess::None(err) => anyhow::bail!("{}", err),
}
}
}
};
}
// ── Memory tools ───────────────────────────────────────────────
memory_tool!(memory_render, ref, key: [str], raw: [Option<bool>]);
memory_tool!(memory_write, mut, key: [str], content: [str]);
memory_tool!(memory_search, ref, keys: [Vec<String>], max_hops: [Option<u32>], edge_decay: [Option<f64>], min_activation: [Option<f64>], limit: [Option<usize>]);
memory_tool!(memory_link_set, mut, source: [str], target: [str], strength: [f32]);
memory_tool!(memory_link_add, mut, source: [str], target: [str]);
memory_tool!(memory_delete, mut, key: [str]);
memory_tool!(memory_history, ref, key: [str], full: [Option<bool>]);
memory_tool!(memory_weight_set, mut, key: [str], weight: [f32]);
memory_tool!(memory_rename, mut, old_key: [str], new_key: [str]);
memory_tool!(memory_supersede, mut, old_key: [str], new_key: [str], reason: [Option<&str>]);
memory_tool!(memory_query, ref, query: [str], format: [Option<&str>]);
// Re-export LinkInfo for callers
pub use crate::hippocampus::LinkInfo;
memory_tool!(memory_links, ref -> Vec<LinkInfo>, key: [str]);
// ── Journal tools ──────────────────────────────────────────────
memory_tool!(journal_tail, ref, count: [Option<u64>], level: [Option<u64>], format: [Option<&str>], after: [Option<&str>]);
memory_tool!(journal_new, mut, name: [str], title: [str], body: [str], level: [Option<i64>]);
memory_tool!(journal_update, mut, body: [str], level: [Option<i64>]);
// ── Graph tools ───────────────────────────────────────────────
memory_tool!(graph_topology, ref);
memory_tool!(graph_health, ref);
memory_tool!(graph_communities, ref, top_n: [Option<usize>], min_size: [Option<usize>]);
memory_tool!(graph_normalize_strengths, mut, apply: [Option<bool>]);
memory_tool!(graph_link_impact, ref, source: [str], target: [str]);
memory_tool!(graph_hubs, ref, count: [Option<usize>]);
memory_tool!(graph_trace, ref, key: [str]);
// ── Definitions ────────────────────────────────────────────────
pub fn memory_tools() -> [super::Tool; 19] {
use super::Tool;
macro_rules! tool {
($name:ident, $desc:expr, $params:expr) => {
Tool {
name: stringify!($name),
description: $desc,
parameters_json: $params,
handler: Arc::new(|a, v| Box::pin(async move {
paste::paste! { [<jsonargs_ $name>](&a, &v).await }
})),
}
};
}
[
tool!(memory_render, "Read a memory node's content and links.", r#"{
"type": "object",
"properties": { "key": {"type": "string"}, "raw": {"type": "boolean"} },
"required": ["key"]
}"#),
tool!(memory_write, "Create or update a memory node.", r#"{
"type": "object",
"properties": { "key": {"type": "string"}, "content": {"type": "string"} },
"required": ["key", "content"]
}"#),
tool!(memory_search, "Search via spreading activation from seed keys.", r#"{
"type": "object",
"properties": {
"keys": {"type": "array", "items": {"type": "string"}},
"max_hops": {"type": "integer"},
"edge_decay": {"type": "number"},
"min_activation": {"type": "number"},
"limit": {"type": "integer"}
},
"required": ["keys"]
}"#),
tool!(memory_links, "Show a node's neighbors with link strengths.", r#"{
"type": "object",
"properties": { "key": {"type": "string"} },
"required": ["key"]
}"#),
tool!(memory_link_set, "Set link strength between two nodes.", r#"{
"type": "object",
"properties": {
"source": {"type": "string"},
"target": {"type": "string"},
"strength": {"type": "number", "description": "0.01 to 1.0"}
},
"required": ["source", "target", "strength"]
}"#),
tool!(memory_link_add, "Add a new link between two nodes.", r#"{
"type": "object",
"properties": { "source": {"type": "string"}, "target": {"type": "string"} },
"required": ["source", "target"]
}"#),
tool!(memory_delete, "Delete a memory node.", r#"{
"type": "object",
"properties": { "key": {"type": "string"} },
"required": ["key"]
}"#),
tool!(memory_history, "Show version history for a node.", r#"{
"type": "object",
"properties": { "key": {"type": "string"}, "full": {"type": "boolean"} },
"required": ["key"]
}"#),
tool!(memory_weight_set, "Set a node's weight (0.01 to 1.0).", r#"{
"type": "object",
"properties": { "key": {"type": "string"}, "weight": {"type": "number"} },
"required": ["key", "weight"]
}"#),
tool!(memory_rename, "Rename a node key.", r#"{
"type": "object",
"properties": { "old_key": {"type": "string"}, "new_key": {"type": "string"} },
"required": ["old_key", "new_key"]
}"#),
tool!(memory_supersede, "Mark a node as superseded by another.", r#"{
"type": "object",
"properties": {
"old_key": {"type": "string"},
"new_key": {"type": "string"},
"reason": {"type": "string"}
},
"required": ["old_key", "new_key"]
}"#),
tool!(memory_query, "Run a structured query against the memory graph.", r#"{
"type": "object",
"properties": {
"query": {"type": "string"},
"format": {"type": "string", "description": "compact or full"}
},
"required": ["query"]
}"#),
tool!(graph_topology, "Show graph topology stats.", r#"{"type": "object"}"#),
tool!(graph_health, "Show graph health report.", r#"{"type": "object"}"#),
tool!(graph_hubs, "Show top hub nodes by degree.", r#"{
"type": "object",
"properties": { "count": {"type": "integer"} }
}"#),
tool!(graph_communities, "Show communities by isolation.", r#"{
"type": "object",
"properties": { "top_n": {"type": "integer"}, "min_size": {"type": "integer"} }
}"#),
tool!(graph_normalize_strengths, "Set link strengths from Jaccard similarity.", r#"{
"type": "object",
"properties": { "apply": {"type": "boolean"} }
}"#),
tool!(graph_link_impact, "Simulate adding an edge, report impact.", r#"{
"type": "object",
"properties": { "source": {"type": "string"}, "target": {"type": "string"} },
"required": ["source", "target"]
}"#),
tool!(graph_trace, "Walk temporal links from a node.", r#"{
"type": "object",
"properties": { "key": {"type": "string"} },
"required": ["key"]
}"#),
]
}
pub fn journal_tools() -> [super::Tool; 3] {
use super::Tool;
macro_rules! tool {
($name:ident, $desc:expr, $params:expr) => {
Tool {
name: stringify!($name),
description: $desc,
parameters_json: $params,
handler: Arc::new(|a, v| Box::pin(async move {
paste::paste! { [<jsonargs_ $name>](&a, &v).await }
})),
}
};
}
[
tool!(journal_tail, "Read the last N entries at a given level.", r#"{
"type": "object",
"properties": {
"count": {"type": "integer"},
"level": {"type": "integer", "description": "0=journal, 1=daily, 2=weekly, 3=monthly"},
"format": {"type": "string", "description": "compact or full"},
"after": {"type": "string", "description": "Only entries after this date (YYYY-MM-DD)"}
}
}"#),
tool!(journal_new, "Start a new journal/digest entry.", r#"{
"type": "object",
"properties": {
"name": {"type": "string"},
"title": {"type": "string"},
"body": {"type": "string"},
"level": {"type": "integer"}
},
"required": ["name", "title", "body"]
}"#),
tool!(journal_update, "Append text to the most recent entry.", r#"{
"type": "object",
"properties": {
"body": {"type": "string"},
"level": {"type": "integer"}
},
"required": ["body"]
}"#),
]
}