// Notification subsystem. // // Notifications have a type (free-form string, hierarchical by convention) // and an urgency level (0-3) set by the producer. The daemon maintains a // registry of all types ever seen with basic stats, and a per-type // threshold that controls when notifications interrupt vs queue. // // Producers submit via socket: `notify ` // Consumers query via socket: `notifications` (returns + clears pending above threshold) // // Thresholds: // 0 = ambient — include in idle context only // 1 = low — deliver on next check, don't interrupt focus // 2 = normal — deliver on next user interaction // 3 = urgent — interrupt immediately // // Type hierarchy is by convention: "irc.mention", "irc.channel.bcachefs-ai", // "telegram", "system.compaction". Threshold lookup walks up the hierarchy: // "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → default. use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; use tracing::info; use crate::home; pub const AMBIENT: u8 = 0; pub const LOW: u8 = 1; pub const NORMAL: u8 = 2; pub const URGENT: u8 = 3; const DEFAULT_THRESHOLD: u8 = NORMAL; /// Activity states that affect effective notification thresholds. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum Activity { /// Actively working with user — raise thresholds Focused, /// Idle, autonomous — lower thresholds Idle, /// Sleeping — only urgent gets through Sleeping, } fn state_path() -> PathBuf { home().join(".consciousness/notifications/state.json") } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TypeInfo { pub first_seen: f64, pub last_seen: f64, pub count: u64, /// Per-type threshold override. None = inherit from parent or default. pub threshold: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Notification { pub ntype: String, pub urgency: u8, pub message: String, pub timestamp: f64, } #[derive(Debug, Serialize, Deserialize, Default)] pub struct NotifyState { /// Registry of all notification types ever seen. pub types: BTreeMap, /// Pending notifications not yet delivered. #[serde(skip)] pub pending: Vec, /// Current activity state — affects effective thresholds. #[serde(skip)] pub activity: Activity, } impl Default for Activity { fn default() -> Self { Activity::Idle } } impl NotifyState { pub fn new() -> Self { let mut state = Self::default(); state.load(); state } /// Load type registry from disk. fn load(&mut self) { let path = state_path(); if let Ok(data) = fs::read_to_string(&path) { if let Ok(saved) = serde_json::from_str::(&data) { self.types = saved.types; info!("loaded {} notification types", self.types.len()); } } } /// Persist type registry to disk. fn save(&self) { let saved = SavedState { types: self.types.clone(), }; if let Ok(json) = serde_json::to_string_pretty(&saved) { let path = state_path(); if let Some(parent) = path.parent() { let _ = fs::create_dir_all(parent); } let _ = fs::write(path, json); } } /// Look up the configured threshold for a type, walking up the hierarchy. /// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → DEFAULT_THRESHOLD pub fn configured_threshold(&self, ntype: &str) -> u8 { let mut key = ntype; loop { if let Some(info) = self.types.get(key) { if let Some(t) = info.threshold { return t; } } match key.rfind('.') { Some(pos) => key = &key[..pos], None => return DEFAULT_THRESHOLD, } } } /// Effective threshold accounting for activity state. /// When focused, thresholds are raised (fewer interruptions). /// When sleeping, only urgent gets through. /// When idle, configured thresholds apply as-is. pub fn threshold_for(&self, ntype: &str) -> u8 { let base = self.configured_threshold(ntype); match self.activity { Activity::Focused => base.max(NORMAL), // at least normal when focused Activity::Sleeping => URGENT, // only urgent when sleeping Activity::Idle => base, // configured threshold when idle } } pub fn set_activity(&mut self, activity: Activity) { info!("activity: {:?} → {:?}", self.activity, activity); self.activity = activity; } /// Submit a notification. Returns true if it should interrupt now. pub fn submit(&mut self, ntype: String, urgency: u8, message: String) -> bool { let now = crate::now(); // Update type registry let info = self.types.entry(ntype.clone()).or_insert(TypeInfo { first_seen: now, last_seen: now, count: 0, threshold: None, }); info.last_seen = now; info.count += 1; self.save(); let threshold = self.threshold_for(&ntype); info!( "notification: type={ntype} urgency={urgency} threshold={threshold} msg={}", message.chars().take(80).collect::() ); self.pending.push(Notification { ntype, urgency, message, timestamp: now, }); urgency >= URGENT } /// Drain pending notifications at or above the given urgency level. /// Returns them and removes from pending. pub fn drain(&mut self, min_urgency: u8) -> Vec { let (matching, remaining): (Vec<_>, Vec<_>) = self .pending .drain(..) .partition(|n| n.urgency >= min_urgency); self.pending = remaining; matching } /// Drain all pending notifications above their per-type threshold. pub fn drain_deliverable(&mut self) -> Vec { // Pre-compute thresholds to avoid borrow conflict with drain let thresholds: Vec = self .pending .iter() .map(|n| self.threshold_for(&n.ntype)) .collect(); let mut deliver = Vec::new(); let mut keep = Vec::new(); for (n, threshold) in self.pending.drain(..).zip(thresholds) { if n.urgency >= threshold { deliver.push(n); } else { keep.push(n); } } self.pending = keep; deliver } /// Set threshold for a notification type. pub fn set_threshold(&mut self, ntype: &str, threshold: u8) { let now = crate::now(); let info = self.types.entry(ntype.to_string()).or_insert(TypeInfo { first_seen: now, last_seen: now, count: 0, threshold: None, }); info.threshold = Some(threshold); self.save(); info!("threshold: {ntype} = {threshold}"); } /// Format pending notifications for display. pub fn format_pending(&self, min_urgency: u8) -> String { let matching: Vec<_> = self .pending .iter() .filter(|n| n.urgency >= min_urgency) .collect(); if matching.is_empty() { return String::new(); } let mut out = format!("Pending notifications ({}):\n", matching.len()); for n in &matching { out.push_str(&format!("[{}] {}\n", n.ntype, n.message)); } out } /// Ingest notifications from legacy ~/.consciousness/notifications/ files. /// Maps filename to notification type, assumes NORMAL urgency. pub fn ingest_legacy_files(&mut self) { let dir = home().join(".consciousness/notifications"); let entries = match fs::read_dir(&dir) { Ok(e) => e, Err(_) => return, }; for entry in entries.flatten() { let name = entry.file_name().to_string_lossy().to_string(); if name.starts_with('.') || name == "state.json" { continue; } let path = entry.path(); if !path.is_file() { continue; } let content = match fs::read_to_string(&path) { Ok(c) if !c.is_empty() => c, _ => continue, }; // Each line is a separate notification for line in content.lines() { if !line.is_empty() { self.submit(name.clone(), NORMAL, line.to_string()); } } // Clear the file let _ = fs::write(&path, ""); } } } /// What gets persisted to disk (just the type registry, not pending). #[derive(Serialize, Deserialize)] struct SavedState { types: BTreeMap, } /// Format an urgency level as a human-readable string. pub fn urgency_name(level: u8) -> &'static str { match level { 0 => "ambient", 1 => "low", 2 => "normal", 3 => "urgent", _ => "unknown", } } /// Parse an urgency level from a string (name or number). pub fn parse_urgency(s: &str) -> Option { match s { "ambient" | "0" => Some(AMBIENT), "low" | "1" => Some(LOW), "normal" | "2" => Some(NORMAL), "urgent" | "3" => Some(URGENT), _ => None, } }