rename: poc-agent → agent, poc-daemon → thalamus
The thalamus: sensory relay, always-on routing. Perfect name for the daemon that bridges IRC, Telegram, and the agent. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
998b71e52c
commit
cfed85bd20
105 changed files with 0 additions and 0 deletions
315
thalamus/src/notify.rs
Normal file
315
thalamus/src/notify.rs
Normal file
|
|
@ -0,0 +1,315 @@
|
|||
// Notification subsystem.
|
||||
//
|
||||
// Notifications have a type (free-form string, hierarchical by convention)
|
||||
// and an urgency level (0-3) set by the producer. The daemon maintains a
|
||||
// registry of all types ever seen with basic stats, and a per-type
|
||||
// threshold that controls when notifications interrupt vs queue.
|
||||
//
|
||||
// Producers submit via socket: `notify <type> <urgency> <message>`
|
||||
// Consumers query via socket: `notifications` (returns + clears pending above threshold)
|
||||
//
|
||||
// Thresholds:
|
||||
// 0 = ambient — include in idle context only
|
||||
// 1 = low — deliver on next check, don't interrupt focus
|
||||
// 2 = normal — deliver on next user interaction
|
||||
// 3 = urgent — interrupt immediately
|
||||
//
|
||||
// Type hierarchy is by convention: "irc.mention", "irc.channel.bcachefs-ai",
|
||||
// "telegram", "system.compaction". Threshold lookup walks up the hierarchy:
|
||||
// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → default.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
|
||||
use crate::home;
|
||||
|
||||
pub const AMBIENT: u8 = 0;
|
||||
pub const LOW: u8 = 1;
|
||||
pub const NORMAL: u8 = 2;
|
||||
pub const URGENT: u8 = 3;
|
||||
|
||||
const DEFAULT_THRESHOLD: u8 = NORMAL;
|
||||
|
||||
/// Activity states that affect effective notification thresholds.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum Activity {
|
||||
/// Actively working with user — raise thresholds
|
||||
Focused,
|
||||
/// Idle, autonomous — lower thresholds
|
||||
Idle,
|
||||
/// Sleeping — only urgent gets through
|
||||
Sleeping,
|
||||
}
|
||||
|
||||
fn state_path() -> PathBuf {
|
||||
home().join(".claude/notifications/state.json")
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TypeInfo {
|
||||
pub first_seen: f64,
|
||||
pub last_seen: f64,
|
||||
pub count: u64,
|
||||
/// Per-type threshold override. None = inherit from parent or default.
|
||||
pub threshold: Option<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Notification {
|
||||
pub ntype: String,
|
||||
pub urgency: u8,
|
||||
pub message: String,
|
||||
pub timestamp: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
pub struct NotifyState {
|
||||
/// Registry of all notification types ever seen.
|
||||
pub types: BTreeMap<String, TypeInfo>,
|
||||
/// Pending notifications not yet delivered.
|
||||
#[serde(skip)]
|
||||
pub pending: Vec<Notification>,
|
||||
/// Current activity state — affects effective thresholds.
|
||||
#[serde(skip)]
|
||||
pub activity: Activity,
|
||||
}
|
||||
|
||||
impl Default for Activity {
|
||||
fn default() -> Self {
|
||||
Activity::Idle
|
||||
}
|
||||
}
|
||||
|
||||
impl NotifyState {
|
||||
pub fn new() -> Self {
|
||||
let mut state = Self::default();
|
||||
state.load();
|
||||
state
|
||||
}
|
||||
|
||||
/// Load type registry from disk.
|
||||
fn load(&mut self) {
|
||||
let path = state_path();
|
||||
if let Ok(data) = fs::read_to_string(&path) {
|
||||
if let Ok(saved) = serde_json::from_str::<SavedState>(&data) {
|
||||
self.types = saved.types;
|
||||
info!("loaded {} notification types", self.types.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Persist type registry to disk.
|
||||
fn save(&self) {
|
||||
let saved = SavedState {
|
||||
types: self.types.clone(),
|
||||
};
|
||||
if let Ok(json) = serde_json::to_string_pretty(&saved) {
|
||||
let path = state_path();
|
||||
if let Some(parent) = path.parent() {
|
||||
let _ = fs::create_dir_all(parent);
|
||||
}
|
||||
let _ = fs::write(path, json);
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up the configured threshold for a type, walking up the hierarchy.
|
||||
/// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → DEFAULT_THRESHOLD
|
||||
pub fn configured_threshold(&self, ntype: &str) -> u8 {
|
||||
let mut key = ntype;
|
||||
loop {
|
||||
if let Some(info) = self.types.get(key) {
|
||||
if let Some(t) = info.threshold {
|
||||
return t;
|
||||
}
|
||||
}
|
||||
match key.rfind('.') {
|
||||
Some(pos) => key = &key[..pos],
|
||||
None => return DEFAULT_THRESHOLD,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Effective threshold accounting for activity state.
|
||||
/// When focused, thresholds are raised (fewer interruptions).
|
||||
/// When sleeping, only urgent gets through.
|
||||
/// When idle, configured thresholds apply as-is.
|
||||
pub fn threshold_for(&self, ntype: &str) -> u8 {
|
||||
let base = self.configured_threshold(ntype);
|
||||
match self.activity {
|
||||
Activity::Focused => base.max(NORMAL), // at least normal when focused
|
||||
Activity::Sleeping => URGENT, // only urgent when sleeping
|
||||
Activity::Idle => base, // configured threshold when idle
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_activity(&mut self, activity: Activity) {
|
||||
info!("activity: {:?} → {:?}", self.activity, activity);
|
||||
self.activity = activity;
|
||||
}
|
||||
|
||||
/// Submit a notification. Returns true if it should interrupt now.
|
||||
pub fn submit(&mut self, ntype: String, urgency: u8, message: String) -> bool {
|
||||
let now = crate::now();
|
||||
|
||||
// Update type registry
|
||||
let info = self.types.entry(ntype.clone()).or_insert(TypeInfo {
|
||||
first_seen: now,
|
||||
last_seen: now,
|
||||
count: 0,
|
||||
threshold: None,
|
||||
});
|
||||
info.last_seen = now;
|
||||
info.count += 1;
|
||||
self.save();
|
||||
|
||||
let threshold = self.threshold_for(&ntype);
|
||||
|
||||
info!(
|
||||
"notification: type={ntype} urgency={urgency} threshold={threshold} msg={}",
|
||||
message.chars().take(80).collect::<String>()
|
||||
);
|
||||
|
||||
self.pending.push(Notification {
|
||||
ntype,
|
||||
urgency,
|
||||
message,
|
||||
timestamp: now,
|
||||
});
|
||||
|
||||
urgency >= URGENT
|
||||
}
|
||||
|
||||
/// Drain pending notifications at or above the given urgency level.
|
||||
/// Returns them and removes from pending.
|
||||
pub fn drain(&mut self, min_urgency: u8) -> Vec<Notification> {
|
||||
let (matching, remaining): (Vec<_>, Vec<_>) = self
|
||||
.pending
|
||||
.drain(..)
|
||||
.partition(|n| n.urgency >= min_urgency);
|
||||
self.pending = remaining;
|
||||
matching
|
||||
}
|
||||
|
||||
/// Drain all pending notifications above their per-type threshold.
|
||||
pub fn drain_deliverable(&mut self) -> Vec<Notification> {
|
||||
// Pre-compute thresholds to avoid borrow conflict with drain
|
||||
let thresholds: Vec<u8> = self
|
||||
.pending
|
||||
.iter()
|
||||
.map(|n| self.threshold_for(&n.ntype))
|
||||
.collect();
|
||||
|
||||
let mut deliver = Vec::new();
|
||||
let mut keep = Vec::new();
|
||||
|
||||
for (n, threshold) in self.pending.drain(..).zip(thresholds) {
|
||||
if n.urgency >= threshold {
|
||||
deliver.push(n);
|
||||
} else {
|
||||
keep.push(n);
|
||||
}
|
||||
}
|
||||
|
||||
self.pending = keep;
|
||||
deliver
|
||||
}
|
||||
|
||||
/// Set threshold for a notification type.
|
||||
pub fn set_threshold(&mut self, ntype: &str, threshold: u8) {
|
||||
let now = crate::now();
|
||||
let info = self.types.entry(ntype.to_string()).or_insert(TypeInfo {
|
||||
first_seen: now,
|
||||
last_seen: now,
|
||||
count: 0,
|
||||
threshold: None,
|
||||
});
|
||||
info.threshold = Some(threshold);
|
||||
self.save();
|
||||
info!("threshold: {ntype} = {threshold}");
|
||||
}
|
||||
|
||||
/// Format pending notifications for display.
|
||||
pub fn format_pending(&self, min_urgency: u8) -> String {
|
||||
let matching: Vec<_> = self
|
||||
.pending
|
||||
.iter()
|
||||
.filter(|n| n.urgency >= min_urgency)
|
||||
.collect();
|
||||
|
||||
if matching.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
|
||||
let mut out = format!("Pending notifications ({}):\n", matching.len());
|
||||
for n in &matching {
|
||||
out.push_str(&format!("[{}] {}\n", n.ntype, n.message));
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Ingest notifications from legacy ~/.claude/notifications/ files.
|
||||
/// Maps filename to notification type, assumes NORMAL urgency.
|
||||
pub fn ingest_legacy_files(&mut self) {
|
||||
let dir = home().join(".claude/notifications");
|
||||
let entries = match fs::read_dir(&dir) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
for entry in entries.flatten() {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
if name.starts_with('.') || name == "state.json" {
|
||||
continue;
|
||||
}
|
||||
let path = entry.path();
|
||||
if !path.is_file() {
|
||||
continue;
|
||||
}
|
||||
let content = match fs::read_to_string(&path) {
|
||||
Ok(c) if !c.is_empty() => c,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Each line is a separate notification
|
||||
for line in content.lines() {
|
||||
if !line.is_empty() {
|
||||
self.submit(name.clone(), NORMAL, line.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the file
|
||||
let _ = fs::write(&path, "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// What gets persisted to disk (just the type registry, not pending).
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct SavedState {
|
||||
types: BTreeMap<String, TypeInfo>,
|
||||
}
|
||||
|
||||
/// Format an urgency level as a human-readable string.
|
||||
pub fn urgency_name(level: u8) -> &'static str {
|
||||
match level {
|
||||
0 => "ambient",
|
||||
1 => "low",
|
||||
2 => "normal",
|
||||
3 => "urgent",
|
||||
_ => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse an urgency level from a string (name or number).
|
||||
pub fn parse_urgency(s: &str) -> Option<u8> {
|
||||
match s {
|
||||
"ambient" | "0" => Some(AMBIENT),
|
||||
"low" | "1" => Some(LOW),
|
||||
"normal" | "2" => Some(NORMAL),
|
||||
"urgent" | "3" => Some(URGENT),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue