// Lock hold time tracking // // Wrappers around tokio::sync primitives that track how long locks are held, // keyed by source location. Use `lock_stats()` to get a snapshot. use std::collections::HashMap; use std::panic::Location; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; use std::time::Instant; use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; // ── Stats Registry ───────────────────────────────────────────── struct LocationStats { count: AtomicU64, total_ns: AtomicU64, max_ns: AtomicU64, } impl LocationStats { fn new() -> Self { Self { count: AtomicU64::new(0), total_ns: AtomicU64::new(0), max_ns: AtomicU64::new(0), } } fn record(&self, duration_ns: u64) { self.count.fetch_add(1, Ordering::Relaxed); self.total_ns.fetch_add(duration_ns, Ordering::Relaxed); // Update max using CAS loop let mut current = self.max_ns.load(Ordering::Relaxed); while duration_ns > current { match self.max_ns.compare_exchange_weak( current, duration_ns, Ordering::Relaxed, Ordering::Relaxed ) { Ok(_) => break, Err(c) => current = c, } } } fn snapshot(&self) -> LockStats { let count = self.count.load(Ordering::Relaxed); let total_ns = self.total_ns.load(Ordering::Relaxed); let max_ns = self.max_ns.load(Ordering::Relaxed); LockStats { count, total_ns, max_ns, avg_ns: if count > 0 { total_ns / count } else { 0 }, } } } /// Stats for a single lock location. #[derive(Clone, Debug)] pub struct LockStats { pub count: u64, pub total_ns: u64, pub max_ns: u64, pub avg_ns: u64, } type StatsMap = std::sync::Mutex, LocationStats>>; fn stats_map() -> &'static StatsMap { static MAP: OnceLock = OnceLock::new(); MAP.get_or_init(|| std::sync::Mutex::new(HashMap::new())) } fn record_hold_time(loc: &'static Location<'static>, duration_ns: u64) { let map = stats_map().lock().unwrap(); if let Some(stats) = map.get(&loc) { stats.record(duration_ns); return; } drop(map); // First time seeing this location — need write access let mut map = stats_map().lock().unwrap(); let stats = map.entry(loc).or_insert_with(LocationStats::new); stats.record(duration_ns); } /// Get a snapshot of all lock stats, sorted by max hold time (descending). pub fn lock_stats() -> Vec<(String, LockStats)> { let map = stats_map().lock().unwrap(); let mut stats: Vec<_> = map.iter() .map(|(loc, s)| (format!("{}:{}", loc.file(), loc.line()), s.snapshot())) .collect(); stats.sort_by(|a, b| b.1.max_ns.cmp(&a.1.max_ns)); stats } /// Reset all lock stats. pub fn reset_lock_stats() { let mut map = stats_map().lock().unwrap(); map.clear(); } // ── TrackedMutex ─────────────────────────────────────────────── /// A Mutex wrapper that tracks hold times by caller location. pub struct TrackedMutex { inner: Mutex, } impl TrackedMutex { pub fn new(value: T) -> Self { Self { inner: Mutex::new(value) } } #[track_caller] pub async fn lock(&self) -> TrackedMutexGuard<'_, T> { let location = Location::caller(); let guard = self.inner.lock().await; TrackedMutexGuard { guard, acquired_at: Instant::now(), location, } } #[track_caller] pub fn try_lock(&self) -> Result, tokio::sync::TryLockError> { let location = Location::caller(); let guard = self.inner.try_lock()?; Ok(TrackedMutexGuard { guard, acquired_at: Instant::now(), location, }) } } pub struct TrackedMutexGuard<'a, T> { guard: MutexGuard<'a, T>, acquired_at: Instant, location: &'static Location<'static>, } impl Drop for TrackedMutexGuard<'_, T> { fn drop(&mut self) { let duration = self.acquired_at.elapsed(); record_hold_time(self.location, duration.as_nanos() as u64); } } impl std::ops::Deref for TrackedMutexGuard<'_, T> { type Target = T; fn deref(&self) -> &T { &self.guard } } impl std::ops::DerefMut for TrackedMutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { &mut self.guard } } // ── TrackedRwLock ────────────────────────────────────────────── /// An RwLock wrapper that tracks hold times by caller location. pub struct TrackedRwLock { inner: RwLock, } impl TrackedRwLock { pub fn new(value: T) -> Self { Self { inner: RwLock::new(value) } } #[track_caller] pub async fn read(&self) -> TrackedRwLockReadGuard<'_, T> { let location = Location::caller(); let guard = self.inner.read().await; TrackedRwLockReadGuard { guard, acquired_at: Instant::now(), location, } } #[track_caller] pub async fn write(&self) -> TrackedRwLockWriteGuard<'_, T> { let location = Location::caller(); let guard = self.inner.write().await; TrackedRwLockWriteGuard { guard, acquired_at: Instant::now(), location, } } } pub struct TrackedRwLockReadGuard<'a, T> { guard: RwLockReadGuard<'a, T>, acquired_at: Instant, location: &'static Location<'static>, } impl Drop for TrackedRwLockReadGuard<'_, T> { fn drop(&mut self) { let duration = self.acquired_at.elapsed(); record_hold_time(self.location, duration.as_nanos() as u64); } } impl std::ops::Deref for TrackedRwLockReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { &self.guard } } pub struct TrackedRwLockWriteGuard<'a, T> { guard: RwLockWriteGuard<'a, T>, acquired_at: Instant, location: &'static Location<'static>, } impl Drop for TrackedRwLockWriteGuard<'_, T> { fn drop(&mut self) { let duration = self.acquired_at.elapsed(); record_hold_time(self.location, duration.as_nanos() as u64); } } impl std::ops::Deref for TrackedRwLockWriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { &self.guard } } impl std::ops::DerefMut for TrackedRwLockWriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { &mut self.guard } }