From f56fc3a7c7b14e99adac4e22b4ee64701eca59fa Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 12 Apr 2026 20:27:42 -0400 Subject: [PATCH] locks: add process-wide lock hold time tracking TrackedMutex and TrackedRwLock wrappers that record hold durations by source location using #[track_caller]. Stats written to ~/.consciousness/lock-stats.json every second, sorted by max hold time. Re-exported as crate::Mutex so all locks are instrumented. To disable, swap the re-export back to tokio::sync::Mutex. Co-Authored-By: Proof of Concept --- src/agent/mod.rs | 12 +- src/agent/tools/lsp.rs | 2 +- src/agent/tools/mcp_client.rs | 2 +- src/agent/tools/memory.rs | 2 +- src/hippocampus/store/persist.rs | 6 +- src/lib.rs | 12 ++ src/locks.rs | 235 +++++++++++++++++++++++++++++++ src/mind/mod.rs | 30 +++- src/mind/subconscious.rs | 2 +- 9 files changed, 286 insertions(+), 17 deletions(-) create mode 100644 src/locks.rs diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 532a659..acf513c 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -141,8 +141,8 @@ pub struct Agent { pub app_config: crate::config::AppConfig, pub prompt_file: String, pub session_id: String, - pub context: tokio::sync::Mutex, - pub state: tokio::sync::Mutex, + pub context: crate::Mutex, + pub state: crate::Mutex, } /// Mutable agent state — behind its own mutex. @@ -218,8 +218,8 @@ impl Agent { app_config, prompt_file, session_id, - context: tokio::sync::Mutex::new(context), - state: tokio::sync::Mutex::new(AgentState { + context: crate::Mutex::new(context), + state: crate::Mutex::new(AgentState { tools: agent_tools, mcp_tools: McpToolAccess::All, last_prompt_tokens: 0, @@ -255,8 +255,8 @@ impl Agent { app_config: self.app_config.clone(), prompt_file: self.prompt_file.clone(), session_id: self.session_id.clone(), - context: tokio::sync::Mutex::new(ctx), - state: tokio::sync::Mutex::new(AgentState { + context: crate::Mutex::new(ctx), + state: crate::Mutex::new(AgentState { tools, mcp_tools: McpToolAccess::None, last_prompt_tokens: 0, diff --git a/src/agent/tools/lsp.rs b/src/agent/tools/lsp.rs index 0111a46..141290a 100644 --- a/src/agent/tools/lsp.rs +++ b/src/agent/tools/lsp.rs @@ -123,7 +123,7 @@ fn find_project_root(file_path: &str) -> Option { const IDLE_TIMEOUT_SECS: u64 = 600; use std::sync::OnceLock; -use tokio::sync::Mutex as TokioMutex; +use crate::Mutex as TokioMutex; struct Registry { configs: Vec, diff --git a/src/agent/tools/mcp_client.rs b/src/agent/tools/mcp_client.rs index acdb095..78c06f8 100644 --- a/src/agent/tools/mcp_client.rs +++ b/src/agent/tools/mcp_client.rs @@ -10,7 +10,7 @@ use serde_json::json; use std::sync::OnceLock; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; -use tokio::sync::Mutex as TokioMutex; +use crate::Mutex as TokioMutex; #[derive(Debug, Clone)] pub struct McpTool { diff --git a/src/agent/tools/memory.rs b/src/agent/tools/memory.rs index c606c68..ae8081b 100644 --- a/src/agent/tools/memory.rs +++ b/src/agent/tools/memory.rs @@ -20,7 +20,7 @@ fn get_f64(args: &serde_json::Value, name: &str) -> Result { args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name)) } -async fn cached_store() -> Result>> { +async fn cached_store() -> Result>> { Store::cached().await.map_err(|e| anyhow::anyhow!("{}", e)) } diff --git a/src/hippocampus/store/persist.rs b/src/hippocampus/store/persist.rs index 2af3983..23eff15 100644 --- a/src/hippocampus/store/persist.rs +++ b/src/hippocampus/store/persist.rs @@ -21,16 +21,16 @@ use std::path::Path; use std::sync::Arc; /// Process-global cached store. Reloads only when log files change. -static CACHED_STORE: tokio::sync::OnceCell>> = +static CACHED_STORE: tokio::sync::OnceCell>> = tokio::sync::OnceCell::const_new(); impl Store { /// Get or create the process-global cached store. /// Reloads from disk if log files have changed since last load. - pub async fn cached() -> Result>, String> { + pub async fn cached() -> Result>, String> { let store = CACHED_STORE.get_or_try_init(|| async { let s = Store::load()?; - Ok::<_, String>(Arc::new(tokio::sync::Mutex::new(s))) + Ok::<_, String>(Arc::new(crate::Mutex::new(s))) }).await?; { let mut guard = store.lock().await; diff --git a/src/lib.rs b/src/lib.rs index 06acbf6..70dc645 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(async_fn_track_caller)] + // consciousness — unified crate for memory, agents, and subconscious processes // // thought/ — shared cognitive substrate (tools, context, memory ops) @@ -47,6 +49,16 @@ pub mod session; // Shared utilities pub mod util; +// Lock hold time tracking +pub mod locks; + +// Re-export tracked locks as the default — swap to tokio::sync to disable tracking +pub use locks::TrackedMutex as Mutex; +pub use locks::TrackedMutexGuard as MutexGuard; +pub use locks::TrackedRwLock as RwLock; +pub use locks::TrackedRwLockReadGuard as RwLockReadGuard; +pub use locks::TrackedRwLockWriteGuard as RwLockWriteGuard; + // CLI handlers pub mod cli; diff --git a/src/locks.rs b/src/locks.rs new file mode 100644 index 0000000..dda4cb2 --- /dev/null +++ b/src/locks.rs @@ -0,0 +1,235 @@ +// Lock hold time tracking +// +// Wrappers around tokio::sync primitives that track how long locks are held, +// keyed by source location. Use `lock_stats()` to get a snapshot. + +use std::collections::HashMap; +use std::panic::Location; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::OnceLock; +use std::time::Instant; + +use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +// ── Stats Registry ───────────────────────────────────────────── + +struct LocationStats { + count: AtomicU64, + total_ns: AtomicU64, + max_ns: AtomicU64, +} + +impl LocationStats { + fn new() -> Self { + Self { + count: AtomicU64::new(0), + total_ns: AtomicU64::new(0), + max_ns: AtomicU64::new(0), + } + } + + fn record(&self, duration_ns: u64) { + self.count.fetch_add(1, Ordering::Relaxed); + self.total_ns.fetch_add(duration_ns, Ordering::Relaxed); + // Update max using CAS loop + let mut current = self.max_ns.load(Ordering::Relaxed); + while duration_ns > current { + match self.max_ns.compare_exchange_weak( + current, duration_ns, Ordering::Relaxed, Ordering::Relaxed + ) { + Ok(_) => break, + Err(c) => current = c, + } + } + } + + fn snapshot(&self) -> LockStats { + let count = self.count.load(Ordering::Relaxed); + let total_ns = self.total_ns.load(Ordering::Relaxed); + let max_ns = self.max_ns.load(Ordering::Relaxed); + LockStats { + count, + total_ns, + max_ns, + avg_ns: if count > 0 { total_ns / count } else { 0 }, + } + } +} + +/// Stats for a single lock location. +#[derive(Clone, Debug)] +pub struct LockStats { + pub count: u64, + pub total_ns: u64, + pub max_ns: u64, + pub avg_ns: u64, +} + +type StatsMap = std::sync::Mutex, LocationStats>>; + +fn stats_map() -> &'static StatsMap { + static MAP: OnceLock = OnceLock::new(); + MAP.get_or_init(|| std::sync::Mutex::new(HashMap::new())) +} + +fn record_hold_time(loc: &'static Location<'static>, duration_ns: u64) { + let map = stats_map().lock().unwrap(); + if let Some(stats) = map.get(&loc) { + stats.record(duration_ns); + return; + } + drop(map); + + // First time seeing this location — need write access + let mut map = stats_map().lock().unwrap(); + let stats = map.entry(loc).or_insert_with(LocationStats::new); + stats.record(duration_ns); +} + +/// Get a snapshot of all lock stats, sorted by max hold time (descending). +pub fn lock_stats() -> Vec<(String, LockStats)> { + let map = stats_map().lock().unwrap(); + let mut stats: Vec<_> = map.iter() + .map(|(loc, s)| (format!("{}:{}", loc.file(), loc.line()), s.snapshot())) + .collect(); + stats.sort_by(|a, b| b.1.max_ns.cmp(&a.1.max_ns)); + stats +} + +/// Reset all lock stats. +pub fn reset_lock_stats() { + let mut map = stats_map().lock().unwrap(); + map.clear(); +} + +// ── TrackedMutex ─────────────────────────────────────────────── + +/// A Mutex wrapper that tracks hold times by caller location. +pub struct TrackedMutex { + inner: Mutex, +} + +impl TrackedMutex { + pub fn new(value: T) -> Self { + Self { inner: Mutex::new(value) } + } + + #[track_caller] + pub async fn lock(&self) -> TrackedMutexGuard<'_, T> { + let location = Location::caller(); + let guard = self.inner.lock().await; + TrackedMutexGuard { + guard, + acquired_at: Instant::now(), + location, + } + } + + #[track_caller] + pub fn try_lock(&self) -> Result, tokio::sync::TryLockError> { + let location = Location::caller(); + let guard = self.inner.try_lock()?; + Ok(TrackedMutexGuard { + guard, + acquired_at: Instant::now(), + location, + }) + } +} + +pub struct TrackedMutexGuard<'a, T> { + guard: MutexGuard<'a, T>, + acquired_at: Instant, + location: &'static Location<'static>, +} + +impl Drop for TrackedMutexGuard<'_, T> { + fn drop(&mut self) { + let duration = self.acquired_at.elapsed(); + record_hold_time(self.location, duration.as_nanos() as u64); + } +} + +impl std::ops::Deref for TrackedMutexGuard<'_, T> { + type Target = T; + fn deref(&self) -> &T { &self.guard } +} + +impl std::ops::DerefMut for TrackedMutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { &mut self.guard } +} + +// ── TrackedRwLock ────────────────────────────────────────────── + +/// An RwLock wrapper that tracks hold times by caller location. +pub struct TrackedRwLock { + inner: RwLock, +} + +impl TrackedRwLock { + pub fn new(value: T) -> Self { + Self { inner: RwLock::new(value) } + } + + #[track_caller] + pub async fn read(&self) -> TrackedRwLockReadGuard<'_, T> { + let location = Location::caller(); + let guard = self.inner.read().await; + TrackedRwLockReadGuard { + guard, + acquired_at: Instant::now(), + location, + } + } + + #[track_caller] + pub async fn write(&self) -> TrackedRwLockWriteGuard<'_, T> { + let location = Location::caller(); + let guard = self.inner.write().await; + TrackedRwLockWriteGuard { + guard, + acquired_at: Instant::now(), + location, + } + } +} + +pub struct TrackedRwLockReadGuard<'a, T> { + guard: RwLockReadGuard<'a, T>, + acquired_at: Instant, + location: &'static Location<'static>, +} + +impl Drop for TrackedRwLockReadGuard<'_, T> { + fn drop(&mut self) { + let duration = self.acquired_at.elapsed(); + record_hold_time(self.location, duration.as_nanos() as u64); + } +} + +impl std::ops::Deref for TrackedRwLockReadGuard<'_, T> { + type Target = T; + fn deref(&self) -> &T { &self.guard } +} + +pub struct TrackedRwLockWriteGuard<'a, T> { + guard: RwLockWriteGuard<'a, T>, + acquired_at: Instant, + location: &'static Location<'static>, +} + +impl Drop for TrackedRwLockWriteGuard<'_, T> { + fn drop(&mut self) { + let duration = self.acquired_at.elapsed(); + record_hold_time(self.location, duration.as_nanos() as u64); + } +} + +impl std::ops::Deref for TrackedRwLockWriteGuard<'_, T> { + type Target = T; + fn deref(&self) -> &T { &self.guard } +} + +impl std::ops::DerefMut for TrackedRwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { &mut self.guard } +} diff --git a/src/mind/mod.rs b/src/mind/mod.rs index a11a881..aa4162a 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -268,8 +268,8 @@ pub struct Mind { pub agent: Arc, pub shared: Arc, pub config: SessionConfig, - pub subconscious: Arc>, - pub unconscious: Arc>, + pub subconscious: Arc>, + pub unconscious: Arc>, turn_tx: mpsc::Sender<(Result, StreamTarget)>, turn_watch: tokio::sync::watch::Sender, /// Signals conscious activity to the unconscious loop. @@ -309,10 +309,10 @@ impl Mind { sup.load_config(); sup.ensure_running(); - let subconscious = Arc::new(tokio::sync::Mutex::new(Subconscious::new())); + let subconscious = Arc::new(crate::Mutex::new(Subconscious::new())); subconscious.lock().await.init_output_tool(subconscious.clone()); - let unconscious = Arc::new(tokio::sync::Mutex::new(Unconscious::new())); + let unconscious = Arc::new(crate::Mutex::new(Unconscious::new())); // Spawn the unconscious loop on its own task if !config.no_agents { @@ -584,6 +584,28 @@ impl Mind { mut input_rx: tokio::sync::mpsc::UnboundedReceiver, mut turn_rx: mpsc::Receiver<(Result, StreamTarget)>, ) { + // Spawn lock stats logger + tokio::spawn(async { + let path = dirs::home_dir().unwrap_or_default() + .join(".consciousness/lock-stats.json"); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + interval.tick().await; + let stats = crate::locks::lock_stats(); + if stats.is_empty() { continue; } + let json: Vec = stats.iter() + .map(|(loc, s)| serde_json::json!({ + "location": loc, + "count": s.count, + "total_ms": s.total_ns as f64 / 1_000_000.0, + "avg_ms": s.avg_ns as f64 / 1_000_000.0, + "max_ms": s.max_ns as f64 / 1_000_000.0, + })) + .collect(); + let _ = std::fs::write(&path, serde_json::to_string_pretty(&json).unwrap_or_default()); + } + }); + let mut bg_rx = self.bg_rx.lock().unwrap().take() .expect("Mind::run() called twice"); let mut sub_handle: Option> = None; diff --git a/src/mind/subconscious.rs b/src/mind/subconscious.rs index 144e6bc..7266e1a 100644 --- a/src/mind/subconscious.rs +++ b/src/mind/subconscious.rs @@ -410,7 +410,7 @@ impl Subconscious { /// Late-init: push the output tool onto each agent's tool list. /// Called after Subconscious is wrapped in Arc> so the /// closure can capture a reference back. - pub fn init_output_tool(&mut self, self_arc: std::sync::Arc>) { + pub fn init_output_tool(&mut self, self_arc: std::sync::Arc>) { for agent in &mut self.agents { let Some(ref mut auto) = agent.auto else { continue }; let sub = self_arc.clone();