From 604f4422159cd6d66c683d3c49e6e6c6448f5144 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Fri, 3 Apr 2026 17:31:17 -0400 Subject: [PATCH] Move thalamus subcrate into main crate Move thalamus/ (poc-daemon) source files into src/thalamus/ as a module of the main crate. The daemon entry point becomes a library function thalamus::run() with a thin poc-daemon binary for backward compatibility. - Copy thalamus source into src/thalamus/, fix crate:: -> super:: - Copy daemon.capnp into schema/, add to build.rs - Re-export daemon_capnp at crate root (capnp codegen requires it) - Add thalamus dependencies (capnp-rpc, tokio-util, toml, rustls, etc.) - Keep thalamus/ subcrate for comparison Co-Authored-By: Proof of Concept --- Cargo.lock | 9 + Cargo.toml | 13 + build.rs | 7 +- schema/daemon.capnp | 84 ++++ src/bin/poc-daemon.rs | 14 + src/lib.rs | 6 + src/thalamus/config.rs | 97 +++++ src/thalamus/context.rs | 19 + src/thalamus/idle.rs | 641 +++++++++++++++++++++++++++++++ src/thalamus/mod.rs | 617 +++++++++++++++++++++++++++++ src/thalamus/modules/irc.rs | 569 +++++++++++++++++++++++++++ src/thalamus/modules/mod.rs | 2 + src/thalamus/modules/telegram.rs | 374 ++++++++++++++++++ src/thalamus/notify.rs | 315 +++++++++++++++ src/thalamus/rpc.rs | 429 +++++++++++++++++++++ src/thalamus/tmux.rs | 54 +++ 16 files changed, 3249 insertions(+), 1 deletion(-) create mode 100644 schema/daemon.capnp create mode 100644 src/bin/poc-daemon.rs create mode 100644 src/thalamus/config.rs create mode 100644 src/thalamus/context.rs create mode 100644 src/thalamus/idle.rs create mode 100644 src/thalamus/mod.rs create mode 100644 src/thalamus/modules/irc.rs create mode 100644 src/thalamus/modules/mod.rs create mode 100644 src/thalamus/modules/telegram.rs create mode 100644 src/thalamus/notify.rs create mode 100644 src/thalamus/rpc.rs create mode 100644 src/thalamus/tmux.rs diff --git a/Cargo.lock b/Cargo.lock index 82f3ce2..c649a12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2880,6 +2880,7 @@ dependencies = [ "base64 0.22.1", "bincode", "capnp", + "capnp-rpc", "capnpc", "chrono", "clap", @@ -2904,16 +2905,24 @@ dependencies = [ "regex", "reqwest", "rkyv", + "rustls", "serde", "serde_json", "skillratings", "tiktoken-rs", "tokio", + "tokio-rustls", + "tokio-util", + "toml", + "tracing", + "tracing-appender", + "tracing-subscriber", "tui-markdown", "tui-textarea-2", "unicode-width", "uuid", "walkdir", + "webpki-roots", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 57986ce..7345041 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,15 @@ log = "0.4" ratatui = { version = "0.30", features = ["unstable-rendered-line-info"] } skillratings = "0.28" crossterm = { version = "0.29", features = ["event-stream"] } +capnp-rpc = "0.20" +tokio-util = { version = "0.7", features = ["compat"] } +toml = "0.8" +tokio-rustls = "0.26" +rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } +webpki-roots = "1" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-appender = "0.2" [build-dependencies] capnpc = "0.20" @@ -91,3 +100,7 @@ path = "src/bin/find-deleted.rs" [[bin]] name = "consciousness" path = "src/bin/consciousness.rs" + +[[bin]] +name = "poc-daemon" +path = "src/bin/poc-daemon.rs" diff --git a/build.rs b/build.rs index dcc6bb1..02c0e4b 100644 --- a/build.rs +++ b/build.rs @@ -2,5 +2,10 @@ fn main() { capnpc::CompilerCommand::new() .file("schema/memory.capnp") .run() - .expect("capnp compile failed"); + .expect("capnp compile failed (memory.capnp)"); + + capnpc::CompilerCommand::new() + .file("schema/daemon.capnp") + .run() + .expect("capnp compile failed (daemon.capnp)"); } diff --git a/schema/daemon.capnp b/schema/daemon.capnp new file mode 100644 index 0000000..eb436da --- /dev/null +++ b/schema/daemon.capnp @@ -0,0 +1,84 @@ +@0xb8e2f4a1c3d56789; + +# Claude daemon RPC interface. +# +# Served over a Unix domain socket. Clients connect, bootstrap +# the Daemon interface, make calls, disconnect. + +struct Notification { + type @0 :Text; + urgency @1 :UInt8; + message @2 :Text; + timestamp @3 :Float64; +} + +struct TypeInfo { + name @0 :Text; + count @1 :UInt64; + firstSeen @2 :Float64; + lastSeen @3 :Float64; + threshold @4 :Int8; # -1 = inherit, 0-3 = explicit level +} + +enum Activity { + idle @0; + focused @1; + sleeping @2; +} + +struct Status { + lastUserMsg @0 :Float64; + lastResponse @1 :Float64; + claudePane @2 :Text; + sleepUntil @3 :Float64; # 0 = not sleeping, -1 = indefinite + quietUntil @4 :Float64; + consolidating @5 :Bool; + dreaming @6 :Bool; + fired @7 :Bool; + userPresent @8 :Bool; + uptime @9 :Float64; + activity @10 :Activity; + pendingCount @11 :UInt32; + idleTimeout @12 :Float64; # configured idle timeout (secs) + notifyTimeout @13 :Float64; # configured notify-via-tmux timeout (secs) + sinceActivity @14 :Float64; # secs since max(lastUserMsg, lastResponse) + sinceUser @15 :Float64; # secs since lastUserMsg + blockReason @16 :Text; # why idle timer hasn't fired + activityEwma @17 :Float64; # 0-1, EWMA of recent activity (running fraction) +} + +interface Daemon { + # Idle timer + user @0 (pane :Text) -> (); + response @1 (pane :Text) -> (); + sleep @2 (until :Float64) -> (); # 0 = indefinite + wake @3 () -> (); + quiet @4 (seconds :UInt32) -> (); + consolidating @5 () -> (); + consolidated @6 () -> (); + dreamStart @7 () -> (); + dreamEnd @8 () -> (); + stop @9 () -> (); + status @10 () -> (status :Status); + + # Notifications + notify @11 (notification :Notification) -> (interrupt :Bool); + getNotifications @12 (minUrgency :UInt8) -> (notifications :List(Notification)); + getTypes @13 () -> (types :List(TypeInfo)); + setThreshold @14 (type :Text, level :UInt8) -> (); + + idleTimeout @16 (seconds :Float64) -> (); + notifyTimeout @19 (seconds :Float64) -> (); + save @17 () -> (); + debug @18 () -> (json :Text); + + ewma @20 (value :Float64) -> (current :Float64); + afk @21 () -> (); + sessionTimeout @22 (seconds :Float64) -> (); + + testNudge @23 () -> (sent :Bool, message :Text); + + # Modules + moduleCommand @15 (module :Text, command :Text, args :List(Text)) + -> (result :Text); +} diff --git a/src/bin/poc-daemon.rs b/src/bin/poc-daemon.rs new file mode 100644 index 0000000..730dfd6 --- /dev/null +++ b/src/bin/poc-daemon.rs @@ -0,0 +1,14 @@ +// poc-daemon — backward-compatible entry point +// +// Delegates to the thalamus module in the main crate. +// The daemon is now part of the consciousness binary but this +// entry point is kept for compatibility with existing scripts. + +use clap::Parser; +use poc_memory::thalamus; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let cli = thalamus::Cli::parse(); + thalamus::run(cli.command).await +} diff --git a/src/lib.rs b/src/lib.rs index dbe7fdb..8b81d78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,12 @@ pub mod cli; // TUI for memory-search pub mod tui; +// Thalamus — notification routing and idle management daemon +pub mod thalamus; + +// Re-export at crate root — capnp codegen emits `crate::daemon_capnp::` paths +pub use thalamus::daemon_capnp; + // Generated capnp bindings pub mod memory_capnp { include!(concat!(env!("OUT_DIR"), "/schema/memory_capnp.rs")); diff --git a/src/thalamus/config.rs b/src/thalamus/config.rs new file mode 100644 index 0000000..8f1668d --- /dev/null +++ b/src/thalamus/config.rs @@ -0,0 +1,97 @@ +// Daemon configuration. +// +// Lives at ~/.consciousness/daemon.toml. Loaded on startup, updated at +// runtime when modules change state (join channel, etc.). + +use super::home; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::path::PathBuf; + +fn config_path() -> PathBuf { + home().join(".consciousness/daemon.toml") +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Config { + #[serde(default)] + pub irc: IrcConfig, + #[serde(default)] + pub telegram: TelegramConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IrcConfig { + pub enabled: bool, + pub server: String, + pub port: u16, + pub tls: bool, + pub nick: String, + pub user: String, + pub realname: String, + pub channels: Vec, +} + +impl Default for IrcConfig { + fn default() -> Self { + Self { + enabled: true, + server: "irc.libera.chat".into(), + port: 6697, + tls: true, + nick: "agent".into(), + user: "agent".into(), + realname: "agent".into(), + channels: vec!["#bcachefs".into(), "#bcachefs-ai".into()], + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelegramConfig { + pub enabled: bool, + pub token: String, + pub chat_id: i64, +} + +impl Default for TelegramConfig { + fn default() -> Self { + // Load token and chat_id from legacy files if they exist + let token = std::fs::read_to_string(home().join(".consciousness/telegram/token")) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + let chat_id = std::fs::read_to_string(home().join(".consciousness/telegram/chat_id")) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + Self { + enabled: !token.is_empty() && chat_id != 0, + token, + chat_id, + } + } +} + +impl Config { + pub fn load() -> Self { + let path = config_path(); + match fs::read_to_string(&path) { + Ok(data) => toml::from_str(&data).unwrap_or_else(|e| { + tracing::warn!("bad config {}: {e}, using defaults", path.display()); + Self::default() + }), + Err(_) => { + let config = Self::default(); + config.save(); + config + } + } + } + + pub fn save(&self) { + let path = config_path(); + if let Ok(data) = toml::to_string_pretty(self) { + let _ = fs::write(path, data); + } + } +} diff --git a/src/thalamus/context.rs b/src/thalamus/context.rs new file mode 100644 index 0000000..22b716a --- /dev/null +++ b/src/thalamus/context.rs @@ -0,0 +1,19 @@ +// Context gathering for idle prompts. +// +// Notifications are handled by the notify module and passed +// in separately by the caller. Git context and IRC digest +// are now available through where-am-i.md and the memory graph. + +/// Build context string for a prompt. +/// notification_text is passed in from the notify module. +pub fn build(_include_irc: bool, notification_text: &str) -> String { + // Keep nudges short — Claude checks notifications via + // `poc-daemon status` on its own. Just mention the count. + let count = notification_text.matches("[irc.").count() + + notification_text.matches("[telegram.").count(); + if count > 0 { + format!("{count} pending notifications") + } else { + String::new() + } +} diff --git a/src/thalamus/idle.rs b/src/thalamus/idle.rs new file mode 100644 index 0000000..88e96fb --- /dev/null +++ b/src/thalamus/idle.rs @@ -0,0 +1,641 @@ +// Idle timer module. +// +// Tracks user presence and Claude response times. When Claude has been +// idle too long, sends a contextual prompt to the tmux pane. Handles +// sleep mode, quiet mode, consolidation suppression, and dream nudges. +// +// Designed as the first "module" — future IRC/Telegram modules will +// follow the same pattern: state + tick + handle_command. + +use super::{context, home, now, notify, tmux}; +use serde::{Deserialize, Serialize}; +use std::fs; +use tracing::info; + +// Defaults +const DEFAULT_IDLE_TIMEOUT: f64 = 5.0 * 60.0; +const DEFAULT_NOTIFY_TIMEOUT: f64 = 2.0 * 60.0; +const DEFAULT_SESSION_ACTIVE_SECS: f64 = 15.0 * 60.0; +const DREAM_INTERVAL_HOURS: u64 = 18; + +/// EWMA decay half-life in seconds (5 minutes). +const EWMA_DECAY_HALF_LIFE: f64 = 5.0 * 60.0; + +/// Minimum seconds between autonomous nudges. +const MIN_NUDGE_INTERVAL: f64 = 15.0; + +/// Boost half-life in seconds (60s). A 60s turn covers half the gap to +/// target; a 15s turn covers ~16%; a 2s turn covers ~2%. +const EWMA_BOOST_HALF_LIFE: f64 = 60.0; + +/// Steady-state target for active work. The EWMA converges toward this +/// during sustained activity rather than toward 1.0. +const EWMA_TARGET: f64 = 0.75; + +/// Persisted subset of daemon state — survives daemon restarts. +/// Includes both epoch floats (for computation) and ISO timestamps +/// (for human debugging via `cat daemon-state.json | jq`). +#[derive(Serialize, Deserialize, Default)] +struct Persisted { + last_user_msg: f64, + last_response: f64, + #[serde(default)] + sleep_until: Option, + #[serde(default)] + claude_pane: Option, + #[serde(default)] + idle_timeout: f64, + #[serde(default)] + notify_timeout: f64, + #[serde(default)] + activity_ewma: f64, + #[serde(default)] + ewma_updated_at: f64, + #[serde(default)] + session_active_secs: f64, + #[serde(default)] + in_turn: bool, + #[serde(default)] + turn_start: f64, + #[serde(default)] + last_nudge: f64, + // Human-readable mirrors — written but not consumed on load + #[serde(default, skip_deserializing)] + last_user_msg_time: String, + #[serde(default, skip_deserializing)] + last_response_time: String, + #[serde(default, skip_deserializing)] + saved_at: String, + #[serde(default, skip_deserializing)] + fired: bool, + #[serde(default, skip_deserializing)] + uptime: f64, +} + +fn state_path() -> std::path::PathBuf { + home().join(".consciousness/daemon-state.json") +} + +/// Compute EWMA decay factor: 0.5^(elapsed / half_life). +fn ewma_factor(elapsed: f64, half_life: f64) -> f64 { + (0.5_f64).powf(elapsed / half_life) +} + +/// Format epoch seconds as a human-readable ISO-ish timestamp. +fn epoch_to_iso(epoch: f64) -> String { + if epoch == 0.0 { + return String::new(); + } + let secs = epoch as u64; + // Use date command — simple and correct for timezone + std::process::Command::new("date") + .args(["-d", &format!("@{secs}"), "+%Y-%m-%dT%H:%M:%S%z"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_default() +} + +#[derive(Serialize)] +pub struct State { + pub last_user_msg: f64, + pub last_response: f64, + pub claude_pane: Option, + pub sleep_until: Option, // None=awake, 0=indefinite, >0=timestamp + pub quiet_until: f64, + pub consolidating: bool, + pub dreaming: bool, + pub dream_start: f64, + pub fired: bool, + pub idle_timeout: f64, + pub notify_timeout: f64, + pub activity_ewma: f64, + pub ewma_updated_at: f64, + pub session_active_secs: f64, + pub in_turn: bool, + pub turn_start: f64, + pub last_nudge: f64, + #[serde(skip)] + pub running: bool, + #[serde(skip)] + pub start_time: f64, + #[serde(skip)] + pub notifications: notify::NotifyState, +} + +impl State { + pub fn new() -> Self { + Self { + last_user_msg: 0.0, + last_response: 0.0, + claude_pane: None, + sleep_until: None, + quiet_until: 0.0, + consolidating: false, + dreaming: false, + dream_start: 0.0, + fired: false, + idle_timeout: DEFAULT_IDLE_TIMEOUT, + notify_timeout: DEFAULT_NOTIFY_TIMEOUT, + session_active_secs: DEFAULT_SESSION_ACTIVE_SECS, + activity_ewma: 0.0, + ewma_updated_at: now(), + in_turn: false, + turn_start: 0.0, + last_nudge: 0.0, + running: true, + start_time: now(), + notifications: notify::NotifyState::new(), + } + } + + pub fn load(&mut self) { + if let Ok(data) = fs::read_to_string(state_path()) { + if let Ok(p) = serde_json::from_str::(&data) { + self.sleep_until = p.sleep_until; + self.claude_pane = p.claude_pane; + if p.idle_timeout > 0.0 { + self.idle_timeout = p.idle_timeout; + } + if p.notify_timeout > 0.0 { + self.notify_timeout = p.notify_timeout; + } + if p.session_active_secs > 0.0 { + self.session_active_secs = p.session_active_secs; + } + // Reset activity timestamps to now — timers count from + // restart, not from stale pre-restart state + let t = now(); + self.last_user_msg = t; + self.last_response = t; + // Restore EWMA state, applying decay for time spent shut down + if p.ewma_updated_at > 0.0 { + let elapsed = t - p.ewma_updated_at; + self.activity_ewma = p.activity_ewma * ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE); + self.in_turn = p.in_turn; + self.turn_start = p.turn_start; + self.last_nudge = p.last_nudge; + } + self.ewma_updated_at = t; + } + } + + // Always try to find the active pane + if self.claude_pane.is_none() { + self.claude_pane = tmux::find_claude_pane(); + } + + info!( + "loaded: user={:.0} resp={:.0} pane={:?} sleep={:?}", + self.last_user_msg, self.last_response, self.claude_pane, self.sleep_until, + ); + } + + pub fn save(&self) { + let p = Persisted { + last_user_msg: self.last_user_msg, + last_response: self.last_response, + sleep_until: self.sleep_until, + claude_pane: self.claude_pane.clone(), + last_user_msg_time: epoch_to_iso(self.last_user_msg), + last_response_time: epoch_to_iso(self.last_response), + saved_at: epoch_to_iso(now()), + fired: self.fired, + idle_timeout: self.idle_timeout, + notify_timeout: self.notify_timeout, + session_active_secs: self.session_active_secs, + activity_ewma: self.activity_ewma, + ewma_updated_at: self.ewma_updated_at, + in_turn: self.in_turn, + turn_start: self.turn_start, + last_nudge: self.last_nudge, + uptime: now() - self.start_time, + }; + if let Ok(json) = serde_json::to_string_pretty(&p) { + let _ = fs::write(state_path(), json); + } + } + + /// Decay the activity EWMA toward zero based on elapsed time. + fn decay_ewma(&mut self) { + let t = now(); + let elapsed = t - self.ewma_updated_at; + if elapsed <= 0.0 { + return; + } + self.activity_ewma *= ewma_factor(elapsed, EWMA_DECAY_HALF_LIFE); + self.ewma_updated_at = t; + } + + /// Boost the EWMA based on turn duration. The boost is proportional to + /// distance from EWMA_TARGET, scaled by a saturation curve on duration. + /// A 15s turn covers half the gap to target; a 2s turn barely registers. + /// Self-limiting: converges toward target, can't overshoot. + fn boost_ewma(&mut self, turn_duration: f64) { + let gap = (EWMA_TARGET - self.activity_ewma).max(0.0); + let saturation = 1.0 - ewma_factor(turn_duration, EWMA_BOOST_HALF_LIFE); + self.activity_ewma += gap * saturation; + } + + // Typed handlers for RPC + pub fn handle_user(&mut self, pane: &str) { + self.decay_ewma(); + self.in_turn = true; + self.turn_start = now(); + let from_user = !self.fired; + if from_user { + self.last_user_msg = now(); + self.notifications.set_activity(notify::Activity::Focused); + } + self.fired = false; + if !pane.is_empty() { + self.claude_pane = Some(pane.to_string()); + } + self.save(); + info!("user (pane={}, from_user={from_user}) ewma={:.3}", + if pane.is_empty() { "unchanged" } else { pane }, + self.activity_ewma); + } + + pub fn handle_response(&mut self, pane: &str) { + let turn_duration = now() - self.turn_start; + self.decay_ewma(); + self.boost_ewma(turn_duration); + self.in_turn = false; + self.last_response = now(); + self.fired = false; + if !pane.is_empty() { + self.claude_pane = Some(pane.to_string()); + } + self.save(); + info!("response (turn={:.1}s) ewma={:.3}", turn_duration, self.activity_ewma); + } + + /// Check if a notification should trigger a tmux prompt. + /// Called when a notification arrives via module channel. + /// Only injects into tmux when idle — if there's an active session + /// (recent user or response), the hook delivers via additionalContext. + pub fn maybe_prompt_notification(&mut self, ntype: &str, urgency: u8, _message: &str) { + if self.user_present() { + return; // hook will deliver it on next prompt + } + // If we've responded recently, the session is active — + // notifications will arrive via hook, no need to wake us + let since_response = now() - self.last_response; + if since_response < self.notify_timeout { + return; + } + // Don't send notifications via tmux directly — they flow + // through the idle nudge. Urgent notifications reset the + // idle timer so the nudge fires sooner. + let effective = self.notifications.threshold_for(ntype); + if urgency >= effective && self.fired { + // Bump: allow the nudge to re-fire for urgent notifications + self.fired = false; + } + } + + pub fn handle_afk(&mut self) { + // Push last_user_msg far enough back that user_present() returns false + self.last_user_msg = now() - self.session_active_secs - 1.0; + self.fired = false; // allow idle timer to fire again + info!("User marked AFK"); + self.save(); + } + + pub fn handle_session_timeout(&mut self, secs: f64) { + self.session_active_secs = secs; + info!("session active timeout = {secs}s"); + self.save(); + } + + pub fn handle_idle_timeout(&mut self, secs: f64) { + self.idle_timeout = secs; + self.save(); + info!("idle timeout = {secs}s"); + } + + pub fn handle_ewma(&mut self, value: f64) -> f64 { + if value >= 0.0 { + self.activity_ewma = value.min(1.0); + self.ewma_updated_at = now(); + self.save(); + info!("ewma set to {:.3}", self.activity_ewma); + } + self.activity_ewma + } + + pub fn handle_notify_timeout(&mut self, secs: f64) { + self.notify_timeout = secs; + self.save(); + info!("notify timeout = {secs}s"); + } + + pub fn handle_sleep(&mut self, until: f64) { + if until == 0.0 { + self.sleep_until = Some(0.0); + info!("sleep indefinitely"); + } else { + self.sleep_until = Some(until); + info!("sleep until {until}"); + } + self.notifications.set_activity(notify::Activity::Sleeping); + self.save(); + } + + pub fn handle_wake(&mut self) { + self.sleep_until = None; + self.fired = false; + self.save(); + info!("wake"); + } + + pub fn handle_quiet(&mut self, seconds: u32) { + self.quiet_until = now() + seconds as f64; + info!("quiet {seconds}s"); + } + + pub fn user_present(&self) -> bool { + (now() - self.last_user_msg) < self.session_active_secs + } + + /// Seconds since the most recent of user message or response. + pub fn since_activity(&self) -> f64 { + let reference = self.last_response.max(self.last_user_msg); + if reference > 0.0 { now() - reference } else { 0.0 } + } + + /// Why the idle timer hasn't fired (or "none" if it would fire now). + pub fn block_reason(&self) -> &'static str { + let t = now(); + if self.fired { + "already fired" + } else if self.sleep_until.is_some() { + "sleeping" + } else if t < self.quiet_until { + "quiet mode" + } else if self.consolidating { + "consolidating" + } else if self.dreaming { + "dreaming" + } else if self.user_present() { + "user present" + } else if self.in_turn { + "in turn" + } else if self.last_response.max(self.last_user_msg) == 0.0 { + "no activity yet" + } else if self.since_activity() < self.idle_timeout { + "not idle long enough" + } else { + "none — would fire" + } + } + + /// Full debug dump as JSON with computed values. + pub fn debug_json(&self) -> String { + let t = now(); + let since_user = t - self.last_user_msg; + let since_response = t - self.last_response; + + serde_json::json!({ + "now": t, + "uptime": t - self.start_time, + "idle_timeout": self.idle_timeout, + "notify_timeout": self.notify_timeout, + "last_user_msg": self.last_user_msg, + "last_user_msg_ago": since_user, + "last_user_msg_time": epoch_to_iso(self.last_user_msg), + "last_response": self.last_response, + "last_response_ago": since_response, + "last_response_time": epoch_to_iso(self.last_response), + "since_activity": self.since_activity(), + "activity_ewma": self.activity_ewma, + "in_turn": self.in_turn, + "turn_start": self.turn_start, + "user_present": self.user_present(), + "claude_pane": self.claude_pane, + "fired": self.fired, + "block_reason": self.block_reason(), + "sleep_until": self.sleep_until, + "quiet_until": self.quiet_until, + "consolidating": self.consolidating, + "dreaming": self.dreaming, + "dream_start": self.dream_start, + "activity": format!("{:?}", self.notifications.activity), + "pending_notifications": self.notifications.pending.len(), + "notification_types": self.notifications.types.len(), + }).to_string() + } + + pub fn send(&self, msg: &str) -> bool { + let pane = match &self.claude_pane { + Some(p) => p.clone(), + None => match tmux::find_claude_pane() { + Some(p) => p, + None => { + info!("send: no claude pane found"); + return false; + } + }, + }; + + let ok = tmux::send_prompt(&pane, msg); + let preview: String = msg.chars().take(80).collect(); + info!("send(pane={pane}, ok={ok}): {preview}"); + ok + } + + fn check_dream_nudge(&self) -> bool { + if !self.dreaming || self.dream_start == 0.0 { + return false; + } + let minutes = (now() - self.dream_start) / 60.0; + if minutes >= 60.0 { + self.send( + "You've been dreaming for over an hour. Time to surface \ + — run dream-end.sh and capture what you found.", + ); + } else if minutes >= 45.0 { + self.send(&format!( + "Dreaming for {:.0} minutes now. Start gathering your threads \ + — you'll want to surface soon.", + minutes + )); + } else if minutes >= 30.0 { + self.send(&format!( + "You've been dreaming for {:.0} minutes. \ + No rush — just a gentle note from the clock.", + minutes + )); + } else { + return false; + } + true + } + + pub fn build_context(&mut self, include_irc: bool) -> String { + // Ingest any legacy notification files + self.notifications.ingest_legacy_files(); + let notif_text = self.notifications.format_pending(notify::AMBIENT); + context::build(include_irc, ¬if_text) + } + + pub async fn tick(&mut self) -> Result<(), String> { + let t = now(); + let h = home(); + + // Decay EWMA on every tick + self.decay_ewma(); + + // Ingest legacy notification files every tick + self.notifications.ingest_legacy_files(); + + // Sleep mode + if let Some(wake_at) = self.sleep_until { + if wake_at == 0.0 { + return Ok(()); // indefinite + } + if t < wake_at { + return Ok(()); + } + // Wake up + info!("sleep expired, waking"); + self.sleep_until = None; + self.fired = false; + self.save(); + let ctx = self.build_context(true); + let extra = if ctx.is_empty() { + String::new() + } else { + format!("\n{ctx}") + }; + self.send(&format!( + "Wake up. Read your journal (poc-memory journal-tail 10), \ + check work-queue.md, and follow what calls to you.{extra}" + )); + return Ok(()); + } + + // Quiet mode + if t < self.quiet_until { + return Ok(()); + } + + // Consolidation + if self.consolidating { + return Ok(()); + } + + // Dream loop (externally managed) + if h.join(".consciousness/agents/dream-loop-active").exists() { + return Ok(()); + } + + // Dream nudges + if self.dreaming { + self.check_dream_nudge(); + return Ok(()); + } + + // Don't nudge while User is here — conversation drives activity + if self.user_present() { + return Ok(()); + } + + // Don't nudge while in a turn + if self.in_turn { + return Ok(()); + } + + // Minimum interval between nudges + let since_nudge = t - self.last_nudge; + if since_nudge < MIN_NUDGE_INTERVAL { + return Ok(()); + } + + // Initial idle timeout — don't start nudging until first idle period + let reference = self.last_response.max(self.last_user_msg); + if reference == 0.0 { + return Ok(()); + } + let elapsed = t - reference; + if elapsed < self.idle_timeout { + return Ok(()); + } + + // Transition to idle — lower notification thresholds + if self.notifications.activity != notify::Activity::Idle { + self.notifications.set_activity(notify::Activity::Idle); + } + + // Fire + let elapsed_min = (elapsed / 60.0) as u64; + let ctx = self.build_context(true); + let extra = if ctx.is_empty() { + String::new() + } else { + format!("\n{ctx}") + }; + + let dream_hours = hours_since_last_dream(); + let mut msg = format!( + "This is your autonomous time (User AFK {elapsed_min}m). \ + Keep doing what you're doing, or find something new to do"); + if dream_hours >= DREAM_INTERVAL_HOURS { + msg.push_str(&format!( + " You haven't dreamed in {dream_hours} hours — \ + consider running ~/.consciousness/tools/dream-start.sh \ + and spending some time in dreaming mode. \ + Or do whatever calls to you.")); + } + let msg = format!("{msg}{extra}"); + + if self.send(&msg) { + self.last_nudge = t; + self.fired = true; + } + + Ok(()) + } + +} + +fn hours_since_last_dream() -> u64 { + let path = home().join(".consciousness/logs/dream-log.jsonl"); + let content = match fs::read_to_string(path) { + Ok(c) if !c.is_empty() => c, + _ => return 999, + }; + + let last_line = match content.lines().last() { + Some(l) => l, + None => return 999, + }; + + let parsed: serde_json::Value = match serde_json::from_str(last_line) { + Ok(v) => v, + Err(_) => return 999, + }; + + let end_str = match parsed.get("end").and_then(|v| v.as_str()) { + Some(s) => s, + None => return 999, + }; + + // Parse ISO 8601 timestamp manually (avoid chrono dependency) + // Format: "2025-03-04T10:30:00Z" or "2025-03-04T10:30:00+00:00" + let end_str = end_str.replace('Z', "+00:00"); + // Use the system date command as a simple parser + let out = std::process::Command::new("date") + .args(["-d", &end_str, "+%s"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .and_then(|s| s.trim().parse::().ok()); + + match out { + Some(end_epoch) => ((now() - end_epoch) / 3600.0) as u64, + None => 999, + } +} diff --git a/src/thalamus/mod.rs b/src/thalamus/mod.rs new file mode 100644 index 0000000..093a567 --- /dev/null +++ b/src/thalamus/mod.rs @@ -0,0 +1,617 @@ +// thalamus/ — notification routing and idle management daemon +// +// Central hub for notification routing, idle management, and +// communication modules (IRC, Telegram) for Claude Code sessions. +// Listens on a Unix domain socket with a Cap'n Proto RPC interface. +// Same entry point serves as both daemon and CLI client. +// +// Moved from the standalone poc-daemon crate into the main +// consciousness crate. + +pub mod config; +pub mod context; +pub mod idle; +pub mod modules; +pub mod notify; +pub mod rpc; +pub mod tmux; + +pub mod daemon_capnp { + include!(concat!(env!("OUT_DIR"), "/schema/daemon_capnp.rs")); +} + +use std::cell::RefCell; +use std::path::PathBuf; +use std::rc::Rc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; +use clap::{Parser, Subcommand}; +use futures::AsyncReadExt; +use tokio::net::UnixListener; +use tracing::{error, info}; + +pub fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs_f64() +} + +pub fn home() -> PathBuf { + PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/root".into())) +} + +fn sock_path() -> PathBuf { + home().join(".consciousness/daemon.sock") +} + +fn pid_path() -> PathBuf { + home().join(".consciousness/daemon.pid") +} + +// -- CLI ------------------------------------------------------------------ + +#[derive(Parser)] +#[command(name = "consciousness daemon", about = "Notification routing and idle management daemon")] +pub struct Cli { + #[command(subcommand)] + pub command: Option, +} + +#[derive(Subcommand)] +pub enum Command { + /// Start the daemon (foreground) + Daemon, + /// Query daemon status + Status, + /// Signal user activity + User { + /// tmux pane identifier + pane: Option, + }, + /// Signal Claude response + Response { + /// tmux pane identifier + pane: Option, + }, + /// Sleep (suppress idle timer). 0 or omit = indefinite + Sleep { + /// Wake timestamp (epoch seconds), 0 = indefinite + until: Option, + }, + /// Cancel sleep + Wake, + /// Suppress prompts for N seconds (default 300) + Quiet { + /// Duration in seconds + seconds: Option, + }, + /// Mark user as AFK (immediately allow idle timer to fire) + Afk, + /// Set session active timeout in seconds (how long after last message user counts as "present") + SessionTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Set idle timeout in seconds (how long before autonomous prompt) + IdleTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Set notify timeout in seconds (how long before tmux notification injection) + NotifyTimeout { + /// Timeout in seconds + seconds: f64, + }, + /// Signal consolidation started + Consolidating, + /// Signal consolidation ended + Consolidated, + /// Signal dream started + DreamStart, + /// Signal dream ended + DreamEnd, + /// Force state persistence to disk + Save, + /// Get or set the activity EWMA (0.0-1.0). No value = query. + Ewma { + /// Value to set (omit to query) + value: Option, + }, + /// Send a test message to the Claude pane + TestSend { + /// Message to send + message: Vec, + }, + /// Fire a test nudge through the daemon (tests the actual idle send path) + TestNudge, + /// Dump full internal state as JSON + Debug, + /// Shut down daemon + Stop, + /// Submit a notification + Notify { + /// Notification type (e.g. "irc", "telegram") + #[arg(name = "type")] + ntype: String, + /// Urgency level (ambient/low/medium/high/critical or 0-4) + urgency: String, + /// Message text + message: Vec, + }, + /// Get pending notifications + Notifications { + /// Minimum urgency filter + min_urgency: Option, + }, + /// List all notification types + NotifyTypes, + /// Set notification threshold for a type + NotifyThreshold { + /// Notification type + #[arg(name = "type")] + ntype: String, + /// Urgency level threshold + level: String, + }, + /// IRC module commands + Irc { + /// Subcommand (join, leave, send, status, log, nick) + command: String, + /// Arguments + args: Vec, + }, + /// Telegram module commands + Telegram { + /// Subcommand + command: String, + /// Arguments + args: Vec, + }, +} + +// -- Client mode ---------------------------------------------------------- + +async fn client_main(cmd: Command) -> Result<(), Box> { + let sock = sock_path(); + if !sock.exists() { + eprintln!("daemon not running (no socket at {})", sock.display()); + std::process::exit(1); + } + + tokio::task::LocalSet::new() + .run_until(async move { + let stream = tokio::net::UnixStream::connect(&sock).await?; + let (reader, writer) = + tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); + let rpc_network = Box::new(twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + let mut rpc_system = RpcSystem::new(rpc_network, None); + let daemon: daemon_capnp::daemon::Client = + rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + + tokio::task::spawn_local(rpc_system); + + match cmd { + Command::Daemon => unreachable!("handled in main"), + Command::Status => { + let reply = daemon.status_request().send().promise.await?; + let s = reply.get()?.get_status()?; + + let fmt_secs = |s: f64| -> String { + if s < 60.0 { format!("{:.0}s", s) } + else if s < 3600.0 { format!("{:.0}m", s / 60.0) } + else { format!("{:.1}h", s / 3600.0) } + }; + + println!("uptime: {} pane: {} activity: {:?} pending: {}", + fmt_secs(s.get_uptime()), + s.get_claude_pane()?.to_str().unwrap_or("none"), + s.get_activity()?, + s.get_pending_count(), + ); + println!("idle timer: {}/{} ({})", + fmt_secs(s.get_since_activity()), + fmt_secs(s.get_idle_timeout()), + s.get_block_reason()?.to_str()?, + ); + println!("notify timer: {}/{}", + fmt_secs(s.get_since_activity()), + fmt_secs(s.get_notify_timeout()), + ); + println!("user: {} (last {}) activity: {:.1}%", + if s.get_user_present() { "present" } else { "away" }, + fmt_secs(s.get_since_user()), + s.get_activity_ewma() * 100.0, + ); + + let sleep = s.get_sleep_until(); + if sleep != 0.0 { + if sleep < 0.0 { + println!("sleep: indefinite"); + } else { + println!("sleep: until {sleep:.0}"); + } + } + if s.get_consolidating() { println!("consolidating"); } + if s.get_dreaming() { println!("dreaming"); } + } + Command::User { pane } => { + let pane = pane.as_deref().unwrap_or(""); + let mut req = daemon.user_request(); + req.get().set_pane(pane); + req.send().promise.await?; + } + Command::Response { pane } => { + let pane = pane.as_deref().unwrap_or(""); + let mut req = daemon.response_request(); + req.get().set_pane(pane); + req.send().promise.await?; + } + Command::Sleep { until } => { + let mut req = daemon.sleep_request(); + req.get().set_until(until.unwrap_or(0.0)); + req.send().promise.await?; + } + Command::Wake => { + daemon.wake_request().send().promise.await?; + } + Command::Quiet { seconds } => { + let mut req = daemon.quiet_request(); + req.get().set_seconds(seconds.unwrap_or(300)); + req.send().promise.await?; + } + Command::TestSend { message } => { + let msg = message.join(" "); + let pane = { + let reply = daemon.status_request().send().promise.await?; + let s = reply.get()?.get_status()?; + s.get_claude_pane()?.to_str()?.to_string() + }; + let ok = tmux::send_prompt(&pane, &msg); + println!("send_prompt(pane={}, ok={}): {}", pane, ok, msg); + return Ok(()); + } + Command::TestNudge => { + let reply = daemon.test_nudge_request().send().promise.await?; + let r = reply.get()?; + println!("sent={} message={}", r.get_sent(), r.get_message()?.to_str()?); + return Ok(()); + } + Command::Afk => { + daemon.afk_request().send().promise.await?; + println!("marked AFK"); + } + Command::SessionTimeout { seconds } => { + let mut req = daemon.session_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("session timeout = {seconds}s"); + } + Command::IdleTimeout { seconds } => { + let mut req = daemon.idle_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("idle timeout = {seconds}s"); + } + Command::NotifyTimeout { seconds } => { + let mut req = daemon.notify_timeout_request(); + req.get().set_seconds(seconds); + req.send().promise.await?; + println!("notify timeout = {seconds}s"); + } + Command::Consolidating => { + daemon.consolidating_request().send().promise.await?; + } + Command::Consolidated => { + daemon.consolidated_request().send().promise.await?; + } + Command::DreamStart => { + daemon.dream_start_request().send().promise.await?; + } + Command::DreamEnd => { + daemon.dream_end_request().send().promise.await?; + } + Command::Save => { + daemon.save_request().send().promise.await?; + println!("state saved"); + } + Command::Ewma { value } => { + let mut req = daemon.ewma_request(); + req.get().set_value(value.unwrap_or(-1.0)); + let reply = req.send().promise.await?; + let current = reply.get()?.get_current(); + println!("{:.1}%", current * 100.0); + } + Command::Debug => { + let reply = daemon.debug_request().send().promise.await?; + let json = reply.get()?.get_json()?.to_str()?; + if let Ok(v) = serde_json::from_str::(json) { + println!("{}", serde_json::to_string_pretty(&v).unwrap_or_else(|_| json.to_string())); + } else { + println!("{json}"); + } + } + Command::Stop => { + daemon.stop_request().send().promise.await?; + println!("stopping"); + } + Command::Notify { ntype, urgency, message } => { + let urgency = notify::parse_urgency(&urgency) + .ok_or_else(|| format!("invalid urgency: {urgency}"))?; + let message = message.join(" "); + if message.is_empty() { + return Err("missing message".into()); + } + + let mut req = daemon.notify_request(); + let mut n = req.get().init_notification(); + n.set_type(&ntype); + n.set_urgency(urgency); + n.set_message(&message); + n.set_timestamp(now()); + let reply = req.send().promise.await?; + if reply.get()?.get_interrupt() { + println!("interrupt"); + } else { + println!("queued"); + } + } + Command::Notifications { min_urgency } => { + let min: u8 = min_urgency + .as_deref() + .and_then(notify::parse_urgency) + .unwrap_or(255); + + let mut req = daemon.get_notifications_request(); + req.get().set_min_urgency(min); + let reply = req.send().promise.await?; + let list = reply.get()?.get_notifications()?; + + for n in list.iter() { + println!( + "[{}:{}] {}", + n.get_type()?.to_str()?, + notify::urgency_name(n.get_urgency()), + n.get_message()?.to_str()?, + ); + } + } + Command::NotifyTypes => { + let reply = daemon.get_types_request().send().promise.await?; + let list = reply.get()?.get_types()?; + + if list.is_empty() { + println!("no notification types registered"); + } else { + for t in list.iter() { + let threshold = if t.get_threshold() < 0 { + "inherit".to_string() + } else { + notify::urgency_name(t.get_threshold() as u8).to_string() + }; + println!( + "{}: count={} threshold={}", + t.get_name()?.to_str()?, + t.get_count(), + threshold, + ); + } + } + } + Command::NotifyThreshold { ntype, level } => { + let level = notify::parse_urgency(&level) + .ok_or_else(|| format!("invalid level: {level}"))?; + + let mut req = daemon.set_threshold_request(); + req.get().set_type(&ntype); + req.get().set_level(level); + req.send().promise.await?; + println!("{ntype} threshold={}", notify::urgency_name(level)); + } + Command::Irc { command, args } => { + module_command(&daemon, "irc", &command, &args).await?; + } + Command::Telegram { command, args } => { + module_command(&daemon, "telegram", &command, &args).await?; + } + } + + Ok(()) + }) + .await +} + +async fn module_command( + daemon: &daemon_capnp::daemon::Client, + module: &str, + command: &str, + args: &[String], +) -> Result<(), Box> { + let mut req = daemon.module_command_request(); + req.get().set_module(module); + req.get().set_command(command); + let mut args_builder = req.get().init_args(args.len() as u32); + for (i, a) in args.iter().enumerate() { + args_builder.set(i as u32, a); + } + let reply = req.send().promise.await?; + let result = reply.get()?.get_result()?.to_str()?; + if !result.is_empty() { + println!("{result}"); + } + Ok(()) +} + +// -- Server mode ---------------------------------------------------------- + +async fn server_main() -> Result<(), Box> { + let log_path = home().join(".consciousness/logs/daemon.log"); + let file_appender = tracing_appender::rolling::daily( + log_path.parent().unwrap(), + "daemon.log", + ); + tracing_subscriber::fmt() + .with_writer(file_appender) + .with_ansi(false) + .with_target(false) + .with_level(false) + .with_timer(tracing_subscriber::fmt::time::time()) + .init(); + + let sock = sock_path(); + let _ = std::fs::remove_file(&sock); + + let pid = std::process::id(); + std::fs::write(pid_path(), pid.to_string()).ok(); + + let daemon_config = Rc::new(RefCell::new(config::Config::load())); + + let state = Rc::new(RefCell::new(idle::State::new())); + state.borrow_mut().load(); + + info!("daemon started (pid={pid})"); + + tokio::task::LocalSet::new() + .run_until(async move { + // Start modules + let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); + + let irc_state = if daemon_config.borrow().irc.enabled { + let irc_config = daemon_config.borrow().irc.clone(); + info!("starting irc module: {}:{}", irc_config.server, irc_config.port); + Some(modules::irc::start(irc_config, notify_tx.clone(), daemon_config.clone())) + } else { + info!("irc module disabled"); + None + }; + + let telegram_state = if daemon_config.borrow().telegram.enabled { + info!("starting telegram module"); + Some(modules::telegram::start( + daemon_config.borrow().telegram.clone(), + notify_tx.clone(), + daemon_config.clone(), + )) + } else { + info!("telegram module disabled"); + None + }; + + let listener = UnixListener::bind(&sock)?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions( + &sock, + std::fs::Permissions::from_mode(0o600), + ) + .ok(); + } + + let shutdown = async { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("sigterm"); + let mut sigint = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("sigint"); + tokio::select! { + _ = sigterm.recv() => info!("SIGTERM"), + _ = sigint.recv() => info!("SIGINT"), + } + }; + tokio::pin!(shutdown); + + let mut tick_timer = tokio::time::interval(Duration::from_secs(30)); + tick_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = &mut shutdown => break, + + // Drain module notifications into state + Some(notif) = notify_rx.recv() => { + state.borrow_mut().maybe_prompt_notification( + ¬if.ntype, notif.urgency, ¬if.message, + ); + state.borrow_mut().notifications.submit( + notif.ntype, + notif.urgency, + notif.message, + ); + } + + _ = tick_timer.tick() => { + if let Err(e) = state.borrow_mut().tick().await { + error!("tick: {e}"); + } + if !state.borrow().running { + break; + } + } + + result = listener.accept() => { + match result { + Ok((stream, _)) => { + let (reader, writer) = + tokio_util::compat::TokioAsyncReadCompatExt::compat(stream) + .split(); + let network = twoparty::VatNetwork::new( + futures::io::BufReader::new(reader), + futures::io::BufWriter::new(writer), + rpc_twoparty_capnp::Side::Server, + Default::default(), + ); + + let daemon_impl = rpc::DaemonImpl::new( + state.clone(), + irc_state.clone(), + telegram_state.clone(), + daemon_config.clone(), + ); + let client: daemon_capnp::daemon::Client = + capnp_rpc::new_client(daemon_impl); + + let rpc_system = RpcSystem::new( + Box::new(network), + Some(client.client), + ); + tokio::task::spawn_local(rpc_system); + } + Err(e) => error!("accept: {e}"), + } + } + } + } + + state.borrow().save(); + let _ = std::fs::remove_file(sock_path()); + let _ = std::fs::remove_file(pid_path()); + info!("daemon stopped"); + + Ok(()) + }) + .await +} + +// -- Entry point ---------------------------------------------------------- + +/// Run the thalamus daemon or client command. +/// Called from the main consciousness binary. +pub async fn run(command: Option) -> Result<(), Box> { + match command { + Some(Command::Daemon) => server_main().await, + Some(cmd) => client_main(cmd).await, + None => { + // Show help + Cli::parse_from(["consciousness-daemon", "--help"]); + Ok(()) + } + } +} diff --git a/src/thalamus/modules/irc.rs b/src/thalamus/modules/irc.rs new file mode 100644 index 0000000..3d27efd --- /dev/null +++ b/src/thalamus/modules/irc.rs @@ -0,0 +1,569 @@ +// IRC module. +// +// Maintains a persistent connection to an IRC server. Parses incoming +// messages into notifications, supports sending messages and runtime +// commands (join, leave, etc.). Config changes persist to daemon.toml. +// +// Runs as a spawned local task on the daemon's LocalSet. Notifications +// flow through an mpsc channel into the main state. Reconnects +// automatically with exponential backoff. + +use crate::thalamus::config::{Config, IrcConfig}; +use crate::thalamus::notify::Notification; +use crate::thalamus::{home, now}; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::io; +use std::rc::Rc; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::mpsc; +use tracing::{error, info, warn}; + +const MAX_LOG_LINES: usize = 200; +const RECONNECT_BASE_SECS: u64 = 5; +const RECONNECT_MAX_SECS: u64 = 300; +const PING_INTERVAL_SECS: u64 = 120; +const PING_TIMEOUT_SECS: u64 = 30; + +/// Parsed IRC message. +struct IrcMessage { + prefix: Option, // nick!user@host + command: String, + params: Vec, +} + +impl IrcMessage { + fn parse(line: &str) -> Option { + let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); + if line.is_empty() { + return None; + } + + let (prefix, rest) = if line.starts_with(':') { + let space = line.find(' ')?; + (Some(line[1..space].to_string()), &line[space + 1..]) + } else { + (None, line) + }; + + let (command_params, trailing) = if let Some(pos) = rest.find(" :") { + (&rest[..pos], Some(rest[pos + 2..].to_string())) + } else { + (rest, None) + }; + + let mut parts: Vec = command_params + .split_whitespace() + .map(String::from) + .collect(); + + if parts.is_empty() { + return None; + } + + let command = parts.remove(0).to_uppercase(); + let mut params = parts; + if let Some(t) = trailing { + params.push(t); + } + + Some(IrcMessage { + prefix, + command, + params, + }) + } + + /// Extract nick from prefix (nick!user@host → nick). + fn nick(&self) -> Option<&str> { + self.prefix + .as_deref() + .and_then(|p| p.split('!').next()) + } +} + +/// Shared IRC state, accessible from both the read task and RPC handlers. +pub struct IrcState { + pub config: IrcConfig, + pub connected: bool, + pub channels: Vec, + pub log: VecDeque, + writer: Option, +} + +/// Type-erased writer handle so we can store it without generic params. +type WriterHandle = Box; + +trait AsyncWriter { + fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>>; +} + +/// Writer over a TLS stream. +struct TlsWriter { + inner: tokio::io::WriteHalf>, +} + +impl AsyncWriter for TlsWriter { + fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { + let data = format!("{line}\r\n"); + Box::pin(async move { + self.inner.write_all(data.as_bytes()).await + }) + } +} + +/// Writer over a plain TCP stream. +struct PlainWriter { + inner: tokio::io::WriteHalf, +} + +impl AsyncWriter for PlainWriter { + fn write_line(&mut self, line: &str) -> std::pin::Pin> + '_>> { + let data = format!("{line}\r\n"); + Box::pin(async move { + self.inner.write_all(data.as_bytes()).await + }) + } +} + +impl IrcState { + fn new(config: IrcConfig) -> Self { + Self { + channels: config.channels.clone(), + config, + connected: false, + log: VecDeque::with_capacity(MAX_LOG_LINES), + writer: None, + } + } + + fn push_log(&mut self, line: &str) { + if self.log.len() >= MAX_LOG_LINES { + self.log.pop_front(); + } + self.log.push_back(line.to_string()); + } + + async fn send_raw(&mut self, line: &str) -> io::Result<()> { + if let Some(ref mut w) = self.writer { + w.write_line(line).await + } else { + Err(io::Error::new(io::ErrorKind::NotConnected, "not connected")) + } + } + + async fn send_privmsg(&mut self, target: &str, msg: &str) -> io::Result<()> { + self.send_raw(&format!("PRIVMSG {target} :{msg}")).await + } + + async fn join(&mut self, channel: &str) -> io::Result<()> { + self.send_raw(&format!("JOIN {channel}")).await?; + if !self.channels.iter().any(|c| c == channel) { + self.channels.push(channel.to_string()); + } + Ok(()) + } + + async fn part(&mut self, channel: &str) -> io::Result<()> { + self.send_raw(&format!("PART {channel}")).await?; + self.channels.retain(|c| c != channel); + Ok(()) + } +} + +pub type SharedIrc = Rc>; + +/// Start the IRC module. Returns the shared state handle. +pub fn start( + config: IrcConfig, + notify_tx: mpsc::UnboundedSender, + daemon_config: Rc>, +) -> SharedIrc { + let state = Rc::new(RefCell::new(IrcState::new(config))); + let state_clone = state.clone(); + + tokio::task::spawn_local(async move { + connection_loop(state_clone, notify_tx, daemon_config).await; + }); + + state +} + +async fn connection_loop( + state: SharedIrc, + notify_tx: mpsc::UnboundedSender, + daemon_config: Rc>, +) { + let mut backoff = RECONNECT_BASE_SECS; + + loop { + let config = state.borrow().config.clone(); + info!("irc: connecting to {}:{}", config.server, config.port); + + match connect_and_run(&state, &config, ¬ify_tx).await { + Ok(()) => { + info!("irc: connection closed cleanly"); + } + Err(e) => { + error!("irc: connection error: {e}"); + } + } + + // Reset backoff if we had a working connection (registered + // successfully before disconnecting) + let was_connected = state.borrow().connected; + state.borrow_mut().connected = false; + state.borrow_mut().writer = None; + if was_connected { + backoff = RECONNECT_BASE_SECS; + } + + // Persist current channel list to config + { + let channels = state.borrow().channels.clone(); + let mut dc = daemon_config.borrow_mut(); + dc.irc.channels = channels; + dc.save(); + } + + info!("irc: reconnecting in {backoff}s"); + tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; + backoff = (backoff * 2).min(RECONNECT_MAX_SECS); + } +} + +async fn connect_and_run( + state: &SharedIrc, + config: &IrcConfig, + notify_tx: &mpsc::UnboundedSender, +) -> io::Result<()> { + let addr = format!("{}:{}", config.server, config.port); + let tcp = tokio::net::TcpStream::connect(&addr).await?; + + if config.tls { + let tls_config = rustls::ClientConfig::builder_with_provider( + rustls::crypto::ring::default_provider().into(), + ) + .with_safe_default_protocol_versions() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .with_root_certificates(root_certs()) + .with_no_client_auth(); + let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); + let server_name = rustls::pki_types::ServerName::try_from(config.server.clone()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let tls_stream = connector.connect(server_name, tcp).await?; + + let (reader, writer) = tokio::io::split(tls_stream); + state.borrow_mut().writer = Some(Box::new(TlsWriter { inner: writer })); + + let buf_reader = BufReader::new(reader); + register_and_read(state, config, buf_reader, notify_tx).await + } else { + let (reader, writer) = tokio::io::split(tcp); + state.borrow_mut().writer = Some(Box::new(PlainWriter { inner: writer })); + + let buf_reader = BufReader::new(reader); + register_and_read(state, config, buf_reader, notify_tx).await + } +} + +async fn register_and_read( + state: &SharedIrc, + config: &IrcConfig, + mut reader: BufReader, + notify_tx: &mpsc::UnboundedSender, +) -> io::Result<()> { + // Register + { + let mut s = state.borrow_mut(); + s.send_raw(&format!("NICK {}", config.nick)).await?; + s.send_raw(&format!("USER {} 0 * :{}", config.user, config.realname)).await?; + } + + let mut buf = Vec::new(); + let mut ping_sent = false; + let mut deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_INTERVAL_SECS); + + loop { + buf.clear(); + + let read_result = tokio::select! { + result = reader.read_until(b'\n', &mut buf) => result, + _ = tokio::time::sleep_until(deadline) => { + if ping_sent { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + "ping timeout — no response from server", + )); + } + info!("irc: no data for {}s, sending PING", PING_INTERVAL_SECS); + state.borrow_mut().send_raw("PING :keepalive").await?; + ping_sent = true; + deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_TIMEOUT_SECS); + continue; + } + }; + + let n = read_result?; + if n == 0 { break; } + + // Any data from server resets the ping timer + ping_sent = false; + deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_INTERVAL_SECS); + + // IRC is not guaranteed UTF-8 — lossy conversion handles Latin-1 etc. + let line = String::from_utf8_lossy(&buf).trim_end().to_string(); + if line.is_empty() { continue; } + let msg = match IrcMessage::parse(&line) { + Some(m) => m, + None => continue, + }; + + match msg.command.as_str() { + "PING" => { + let arg = msg.params.first().map(|s| s.as_str()).unwrap_or(""); + state.borrow_mut().send_raw(&format!("PONG :{arg}")).await?; + } + + // RPL_WELCOME — registration complete + "001" => { + info!("irc: registered as {}", config.nick); + state.borrow_mut().connected = true; + + // Join configured channels + let channels = state.borrow().channels.clone(); + for ch in &channels { + if let Err(e) = state.borrow_mut().send_raw(&format!("JOIN {ch}")).await { + warn!("irc: failed to join {ch}: {e}"); + } + } + } + + "PRIVMSG" => { + let target = msg.params.first().map(|s| s.as_str()).unwrap_or(""); + let text = msg.params.get(1).map(|s| s.as_str()).unwrap_or(""); + let nick = msg.nick().unwrap_or("unknown"); + + // Handle CTCP requests (wrapped in \x01) + if text.starts_with('\x01') && text.ends_with('\x01') { + let ctcp = &text[1..text.len()-1]; + if ctcp.starts_with("VERSION") { + let reply = format!( + "NOTICE {nick} :\x01VERSION poc-daemon 0.4.0\x01" + ); + state.borrow_mut().send_raw(&reply).await.ok(); + } + // Don't generate notifications for CTCP + continue; + } + + // Log the message + let log_line = if target.starts_with('#') { + format!("[{}] <{}> {}", target, nick, text) + } else { + format!("[PM:{nick}] {text}") + }; + state.borrow_mut().push_log(&log_line); + + // Write to per-channel/per-user log file + if target.starts_with('#') { + append_log(target, nick, text); + } else { + append_log(&format!("pm-{nick}"), nick, text); + } + + // Generate notification + let (ntype, urgency) = classify_privmsg( + nick, + target, + text, + &config.nick, + ); + + let _ = notify_tx.send(Notification { + ntype, + urgency, + message: log_line, + timestamp: now(), + }); + } + + // Nick in use + "433" => { + let alt = format!("{}_", config.nick); + warn!("irc: nick in use, trying {alt}"); + state.borrow_mut().send_raw(&format!("NICK {alt}")).await?; + } + + "JOIN" | "PART" | "QUIT" | "KICK" | "MODE" | "TOPIC" | "NOTICE" => { + // Could log these, but skip for now + } + + _ => {} + } + } + + Ok(()) +} + +/// Classify a PRIVMSG into notification type and urgency. +fn classify_privmsg(nick: &str, target: &str, text: &str, my_nick: &str) -> (String, u8) { + let my_nick_lower = my_nick.to_lowercase(); + let text_lower = text.to_lowercase(); + + if !target.starts_with('#') { + // Private message + (format!("irc.pm.{nick}"), crate::thalamus::notify::URGENT) + } else if text_lower.contains(&my_nick_lower) { + // Mentioned in channel + (format!("irc.mention.{nick}"), crate::thalamus::notify::NORMAL) + } else { + // Regular channel message + let channel = target.trim_start_matches('#'); + (format!("irc.channel.{channel}"), crate::thalamus::notify::AMBIENT) + } +} + +/// Append a message to the per-channel or per-user log file. +/// Logs go to ~/.consciousness/irc/logs/{target}.log (e.g. #bcachefs.log, pm-user.log) +fn append_log(target: &str, nick: &str, text: &str) { + use std::io::Write; + // Sanitize target for filename (strip leading #, lowercase) + let filename = format!("{}.log", target.trim_start_matches('#').to_lowercase()); + let dir = home().join(".consciousness/irc/logs"); + let _ = std::fs::create_dir_all(&dir); + if let Ok(mut f) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(dir.join(&filename)) + { + let secs = now() as u64; + let _ = writeln!(f, "{secs} <{nick}> {text}"); + } +} + +fn root_certs() -> rustls::RootCertStore { + let mut roots = rustls::RootCertStore::empty(); + roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + roots +} + +/// Handle a runtime command from RPC. +pub async fn handle_command( + state: &SharedIrc, + daemon_config: &Rc>, + cmd: &str, + args: &[String], +) -> Result { + match cmd { + "join" => { + let channel = args.first().ok_or("usage: irc join ")?; + let channel = if channel.starts_with('#') { + channel.clone() + } else { + format!("#{channel}") + }; + state + .borrow_mut() + .join(&channel) + .await + .map_err(|e| e.to_string())?; + + // Persist + let mut dc = daemon_config.borrow_mut(); + if !dc.irc.channels.contains(&channel) { + dc.irc.channels.push(channel.clone()); + } + dc.save(); + + Ok(format!("joined {channel}")) + } + "leave" | "part" => { + let channel = args.first().ok_or("usage: irc leave ")?; + let channel = if channel.starts_with('#') { + channel.clone() + } else { + format!("#{channel}") + }; + state + .borrow_mut() + .part(&channel) + .await + .map_err(|e| e.to_string())?; + + // Persist + let mut dc = daemon_config.borrow_mut(); + dc.irc.channels.retain(|c| c != &channel); + dc.save(); + + Ok(format!("left {channel}")) + } + "send" | "msg" => { + if args.len() < 2 { + return Err("usage: irc send ".into()); + } + let target = &args[0]; + if target.starts_with('#') { + let s = state.borrow(); + if !s.channels.iter().any(|c| c == target) { + return Err(format!( + "not in channel {target} (joined: {})", + s.channels.join(", ") + )); + } + } + let msg = args[1..].join(" "); + let nick = state.borrow().config.nick.clone(); + state + .borrow_mut() + .send_privmsg(target, &msg) + .await + .map_err(|e| e.to_string())?; + append_log(target, &nick, &msg); + Ok(format!("sent to {target}")) + } + "status" => { + let s = state.borrow(); + Ok(format!( + "connected={} channels={} log_lines={} nick={}", + s.connected, + s.channels.join(","), + s.log.len(), + s.config.nick, + )) + } + "log" => { + let n: usize = args + .first() + .and_then(|s| s.parse().ok()) + .unwrap_or(15); + let s = state.borrow(); + let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); + let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); + lines.reverse(); + Ok(lines.join("\n")) + } + "nick" => { + let new_nick = args.first().ok_or("usage: irc nick ")?; + state + .borrow_mut() + .send_raw(&format!("NICK {new_nick}")) + .await + .map_err(|e| e.to_string())?; + + let mut dc = daemon_config.borrow_mut(); + dc.irc.nick = new_nick.clone(); + dc.save(); + + Ok(format!("nick → {new_nick}")) + } + _ => Err(format!( + "unknown irc command: {cmd}\n\ + commands: join, leave, send, status, log, nick" + )), + } +} diff --git a/src/thalamus/modules/mod.rs b/src/thalamus/modules/mod.rs new file mode 100644 index 0000000..0e5debc --- /dev/null +++ b/src/thalamus/modules/mod.rs @@ -0,0 +1,2 @@ +pub mod irc; +pub mod telegram; diff --git a/src/thalamus/modules/telegram.rs b/src/thalamus/modules/telegram.rs new file mode 100644 index 0000000..13150c0 --- /dev/null +++ b/src/thalamus/modules/telegram.rs @@ -0,0 +1,374 @@ +// Telegram module. +// +// Long-polls the Telegram Bot API for messages from Kent's chat. +// Downloads media (photos, voice, documents) to local files. +// Sends text and files. Notifications flow through mpsc into the +// daemon's main state. +// +// Only accepts messages from the configured chat_id (prompt +// injection defense — other senders get a "private bot" reply). + +use crate::thalamus::config::{Config, TelegramConfig}; +use crate::thalamus::notify::Notification; +use crate::thalamus::{home, now}; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::path::PathBuf; +use std::rc::Rc; +use tokio::sync::mpsc; +use tracing::{error, info}; + +const MAX_LOG_LINES: usize = 100; +const POLL_TIMEOUT: u64 = 30; + +pub struct TelegramState { + pub config: TelegramConfig, + pub connected: bool, + pub log: VecDeque, + pub last_offset: i64, + client: reqwest::Client, +} + +pub type SharedTelegram = Rc>; + +impl TelegramState { + fn new(config: TelegramConfig) -> Self { + let last_offset = load_offset(); + Self { + config, + connected: false, + log: VecDeque::with_capacity(MAX_LOG_LINES), + last_offset, + client: reqwest::Client::new(), + } + } + + fn push_log(&mut self, line: &str) { + if self.log.len() >= MAX_LOG_LINES { + self.log.pop_front(); + } + self.log.push_back(line.to_string()); + } + + fn api_url(&self, method: &str) -> String { + format!( + "https://api.telegram.org/bot{}/{}", + self.config.token, method + ) + } +} + +fn offset_path() -> PathBuf { + home().join(".consciousness/telegram/last_offset") +} + +fn load_offset() -> i64 { + std::fs::read_to_string(offset_path()) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0) +} + +fn save_offset(offset: i64) { + let _ = std::fs::write(offset_path(), offset.to_string()); +} + +fn history_path() -> PathBuf { + home().join(".consciousness/telegram/history.log") +} + +fn media_dir() -> PathBuf { + home().join(".consciousness/telegram/media") +} + +fn append_history(line: &str) { + use std::io::Write; + if let Ok(mut f) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(history_path()) + { + let _ = writeln!(f, "{}", line); + } +} + +/// Start the Telegram module. Returns the shared state handle. +pub fn start( + config: TelegramConfig, + notify_tx: mpsc::UnboundedSender, + _daemon_config: Rc>, +) -> SharedTelegram { + let state = Rc::new(RefCell::new(TelegramState::new(config))); + let state_clone = state.clone(); + + tokio::task::spawn_local(async move { + poll_loop(state_clone, notify_tx).await; + }); + + state +} + +async fn poll_loop( + state: SharedTelegram, + notify_tx: mpsc::UnboundedSender, +) { + let _ = std::fs::create_dir_all(media_dir()); + + loop { + match poll_once(&state, ¬ify_tx).await { + Ok(()) => {} + Err(e) => { + error!("telegram: poll error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } +} + +async fn poll_once( + state: &SharedTelegram, + notify_tx: &mpsc::UnboundedSender, +) -> Result<(), Box> { + let (url, chat_id, token) = { + let s = state.borrow(); + let url = format!( + "{}?offset={}&timeout={}", + s.api_url("getUpdates"), + s.last_offset, + POLL_TIMEOUT, + ); + (url, s.config.chat_id, s.config.token.clone()) + }; + + let client = state.borrow().client.clone(); + let resp: serde_json::Value = client + .get(&url) + .timeout(std::time::Duration::from_secs(POLL_TIMEOUT + 5)) + .send() + .await? + .json() + .await?; + + if !state.borrow().connected { + state.borrow_mut().connected = true; + info!("telegram: connected"); + } + + let results = resp["result"].as_array(); + let results = match results { + Some(r) => r, + None => return Ok(()), + }; + + for update in results { + let update_id = update["update_id"].as_i64().unwrap_or(0); + let msg = &update["message"]; + + // Update offset + { + let mut s = state.borrow_mut(); + s.last_offset = update_id + 1; + save_offset(s.last_offset); + } + + let msg_chat_id = msg["chat"]["id"].as_i64().unwrap_or(0); + if msg_chat_id != chat_id { + // Reject messages from unknown chats + let reject_url = format!( + "https://api.telegram.org/bot{}/sendMessage", + token + ); + let _ = client + .post(&reject_url) + .form(&[ + ("chat_id", msg_chat_id.to_string()), + ("text", "This is a private bot.".to_string()), + ]) + .send() + .await; + continue; + } + + let sender = msg["from"]["first_name"] + .as_str() + .unwrap_or("unknown") + .to_string(); + + // Handle different message types + if let Some(text) = msg["text"].as_str() { + let log_line = format!("[{}] {}", sender, text); + state.borrow_mut().push_log(&log_line); + + let ts = timestamp(); + append_history(&format!("{ts} [{sender}] {text}")); + + let _ = notify_tx.send(Notification { + ntype: format!("telegram.{}", sender.to_lowercase()), + urgency: crate::thalamus::notify::NORMAL, + message: log_line, + timestamp: now(), + }); + } else if let Some(photos) = msg["photo"].as_array() { + // Pick largest photo + let best = photos.iter().max_by_key(|p| p["file_size"].as_i64().unwrap_or(0)); + if let Some(photo) = best { + if let Some(file_id) = photo["file_id"].as_str() { + let caption = msg["caption"].as_str().unwrap_or(""); + let local = download_file(&client, &token, file_id, ".jpg").await; + let display = match &local { + Some(p) => format!("[photo: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), + None => format!("[photo]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), + }; + let log_line = format!("[{}] {}", sender, display); + state.borrow_mut().push_log(&log_line); + let ts = timestamp(); + append_history(&format!("{ts} [{sender}] {display}")); + + let _ = notify_tx.send(Notification { + ntype: format!("telegram.{}", sender.to_lowercase()), + urgency: crate::thalamus::notify::NORMAL, + message: log_line, + timestamp: now(), + }); + } + } + } else if msg["voice"].is_object() { + if let Some(file_id) = msg["voice"]["file_id"].as_str() { + let caption = msg["caption"].as_str().unwrap_or(""); + let local = download_file(&client, &token, file_id, ".ogg").await; + let display = match &local { + Some(p) => format!("[voice: {}]{}", p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), + None => format!("[voice]{}", if caption.is_empty() { String::new() } else { format!(" {caption}") }), + }; + let log_line = format!("[{}] {}", sender, display); + state.borrow_mut().push_log(&log_line); + let ts = timestamp(); + append_history(&format!("{ts} [{sender}] {display}")); + + let _ = notify_tx.send(Notification { + ntype: format!("telegram.{}", sender.to_lowercase()), + urgency: crate::thalamus::notify::NORMAL, + message: log_line, + timestamp: now(), + }); + } + } else if msg["document"].is_object() { + if let Some(file_id) = msg["document"]["file_id"].as_str() { + let fname = msg["document"]["file_name"].as_str().unwrap_or("file"); + let caption = msg["caption"].as_str().unwrap_or(""); + let local = download_file(&client, &token, file_id, "").await; + let display = match &local { + Some(p) => format!("[doc: {} -> {}]{}", fname, p.display(), if caption.is_empty() { String::new() } else { format!(" {caption}") }), + None => format!("[doc: {}]{}", fname, if caption.is_empty() { String::new() } else { format!(" {caption}") }), + }; + let log_line = format!("[{}] {}", sender, display); + state.borrow_mut().push_log(&log_line); + let ts = timestamp(); + append_history(&format!("{ts} [{sender}] {display}")); + + let _ = notify_tx.send(Notification { + ntype: format!("telegram.{}", sender.to_lowercase()), + urgency: crate::thalamus::notify::NORMAL, + message: log_line, + timestamp: now(), + }); + } + } + } + + Ok(()) +} + +async fn download_file( + client: &reqwest::Client, + token: &str, + file_id: &str, + ext: &str, +) -> Option { + let url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let resp: serde_json::Value = client.get(&url).send().await.ok()?.json().await.ok()?; + let file_path = resp["result"]["file_path"].as_str()?; + + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + let basename = std::path::Path::new(file_path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("file"); + let local_name = if ext.is_empty() { + basename.to_string() + } else { + let stem = std::path::Path::new(basename) + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("file"); + format!("{}{}", stem, ext) + }; + let secs = now() as u64; + let local_path = media_dir().join(format!("{secs}_{local_name}")); + std::fs::write(&local_path, &bytes).ok()?; + Some(local_path) +} + +fn timestamp() -> String { + // Use the same unix seconds approach as IRC module + format!("{}", now() as u64) +} + +/// Handle a runtime command from RPC. +pub async fn handle_command( + state: &SharedTelegram, + _daemon_config: &Rc>, + cmd: &str, + args: &[String], +) -> Result { + match cmd { + "send" => { + let msg = args.join(" "); + if msg.is_empty() { + return Err("usage: telegram send ".into()); + } + let (url, client) = { + let s = state.borrow(); + (s.api_url("sendMessage"), s.client.clone()) + }; + let chat_id = state.borrow().config.chat_id.to_string(); + client + .post(&url) + .form(&[("chat_id", chat_id.as_str()), ("text", msg.as_str())]) + .send() + .await + .map_err(|e| e.to_string())?; + + let ts = timestamp(); + append_history(&format!("{ts} [agent] {msg}")); + + Ok("sent".to_string()) + } + "status" => { + let s = state.borrow(); + Ok(format!( + "connected={} log_lines={} offset={}", + s.connected, + s.log.len(), + s.last_offset, + )) + } + "log" => { + let n: usize = args + .first() + .and_then(|s| s.parse().ok()) + .unwrap_or(15); + let s = state.borrow(); + let lines: Vec<&String> = s.log.iter().rev().take(n).collect(); + let mut lines: Vec<&str> = lines.iter().map(|s| s.as_str()).collect(); + lines.reverse(); + Ok(lines.join("\n")) + } + _ => Err(format!( + "unknown telegram command: {cmd}\n\ + commands: send, status, log" + )), + } +} diff --git a/src/thalamus/notify.rs b/src/thalamus/notify.rs new file mode 100644 index 0000000..012d666 --- /dev/null +++ b/src/thalamus/notify.rs @@ -0,0 +1,315 @@ +// Notification subsystem. +// +// Notifications have a type (free-form string, hierarchical by convention) +// and an urgency level (0-3) set by the producer. The daemon maintains a +// registry of all types ever seen with basic stats, and a per-type +// threshold that controls when notifications interrupt vs queue. +// +// Producers submit via socket: `notify ` +// Consumers query via socket: `notifications` (returns + clears pending above threshold) +// +// Thresholds: +// 0 = ambient — include in idle context only +// 1 = low — deliver on next check, don't interrupt focus +// 2 = normal — deliver on next user interaction +// 3 = urgent — interrupt immediately +// +// Type hierarchy is by convention: "irc.mention", "irc.channel.bcachefs-ai", +// "telegram", "system.compaction". Threshold lookup walks up the hierarchy: +// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → default. + +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::fs; +use std::path::PathBuf; +use tracing::info; + +use super::home; + +pub const AMBIENT: u8 = 0; +pub const LOW: u8 = 1; +pub const NORMAL: u8 = 2; +pub const URGENT: u8 = 3; + +const DEFAULT_THRESHOLD: u8 = NORMAL; + +/// Activity states that affect effective notification thresholds. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Activity { + /// Actively working with user — raise thresholds + Focused, + /// Idle, autonomous — lower thresholds + Idle, + /// Sleeping — only urgent gets through + Sleeping, +} + +fn state_path() -> PathBuf { + home().join(".consciousness/notifications/state.json") +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TypeInfo { + pub first_seen: f64, + pub last_seen: f64, + pub count: u64, + /// Per-type threshold override. None = inherit from parent or default. + pub threshold: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Notification { + pub ntype: String, + pub urgency: u8, + pub message: String, + pub timestamp: f64, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct NotifyState { + /// Registry of all notification types ever seen. + pub types: BTreeMap, + /// Pending notifications not yet delivered. + #[serde(skip)] + pub pending: Vec, + /// Current activity state — affects effective thresholds. + #[serde(skip)] + pub activity: Activity, +} + +impl Default for Activity { + fn default() -> Self { + Activity::Idle + } +} + +impl NotifyState { + pub fn new() -> Self { + let mut state = Self::default(); + state.load(); + state + } + + /// Load type registry from disk. + fn load(&mut self) { + let path = state_path(); + if let Ok(data) = fs::read_to_string(&path) { + if let Ok(saved) = serde_json::from_str::(&data) { + self.types = saved.types; + info!("loaded {} notification types", self.types.len()); + } + } + } + + /// Persist type registry to disk. + fn save(&self) { + let saved = SavedState { + types: self.types.clone(), + }; + if let Ok(json) = serde_json::to_string_pretty(&saved) { + let path = state_path(); + if let Some(parent) = path.parent() { + let _ = fs::create_dir_all(parent); + } + let _ = fs::write(path, json); + } + } + + /// Look up the configured threshold for a type, walking up the hierarchy. + /// "irc.channel.bcachefs-ai" → "irc.channel" → "irc" → DEFAULT_THRESHOLD + pub fn configured_threshold(&self, ntype: &str) -> u8 { + let mut key = ntype; + loop { + if let Some(info) = self.types.get(key) { + if let Some(t) = info.threshold { + return t; + } + } + match key.rfind('.') { + Some(pos) => key = &key[..pos], + None => return DEFAULT_THRESHOLD, + } + } + } + + /// Effective threshold accounting for activity state. + /// When focused, thresholds are raised (fewer interruptions). + /// When sleeping, only urgent gets through. + /// When idle, configured thresholds apply as-is. + pub fn threshold_for(&self, ntype: &str) -> u8 { + let base = self.configured_threshold(ntype); + match self.activity { + Activity::Focused => base.max(NORMAL), // at least normal when focused + Activity::Sleeping => URGENT, // only urgent when sleeping + Activity::Idle => base, // configured threshold when idle + } + } + + pub fn set_activity(&mut self, activity: Activity) { + info!("activity: {:?} → {:?}", self.activity, activity); + self.activity = activity; + } + + /// Submit a notification. Returns true if it should interrupt now. + pub fn submit(&mut self, ntype: String, urgency: u8, message: String) -> bool { + let now = super::now(); + + // Update type registry + let info = self.types.entry(ntype.clone()).or_insert(TypeInfo { + first_seen: now, + last_seen: now, + count: 0, + threshold: None, + }); + info.last_seen = now; + info.count += 1; + self.save(); + + let threshold = self.threshold_for(&ntype); + + info!( + "notification: type={ntype} urgency={urgency} threshold={threshold} msg={}", + message.chars().take(80).collect::() + ); + + self.pending.push(Notification { + ntype, + urgency, + message, + timestamp: now, + }); + + urgency >= URGENT + } + + /// Drain pending notifications at or above the given urgency level. + /// Returns them and removes from pending. + pub fn drain(&mut self, min_urgency: u8) -> Vec { + let (matching, remaining): (Vec<_>, Vec<_>) = self + .pending + .drain(..) + .partition(|n| n.urgency >= min_urgency); + self.pending = remaining; + matching + } + + /// Drain all pending notifications above their per-type threshold. + pub fn drain_deliverable(&mut self) -> Vec { + // Pre-compute thresholds to avoid borrow conflict with drain + let thresholds: Vec = self + .pending + .iter() + .map(|n| self.threshold_for(&n.ntype)) + .collect(); + + let mut deliver = Vec::new(); + let mut keep = Vec::new(); + + for (n, threshold) in self.pending.drain(..).zip(thresholds) { + if n.urgency >= threshold { + deliver.push(n); + } else { + keep.push(n); + } + } + + self.pending = keep; + deliver + } + + /// Set threshold for a notification type. + pub fn set_threshold(&mut self, ntype: &str, threshold: u8) { + let now = super::now(); + let info = self.types.entry(ntype.to_string()).or_insert(TypeInfo { + first_seen: now, + last_seen: now, + count: 0, + threshold: None, + }); + info.threshold = Some(threshold); + self.save(); + info!("threshold: {ntype} = {threshold}"); + } + + /// Format pending notifications for display. + pub fn format_pending(&self, min_urgency: u8) -> String { + let matching: Vec<_> = self + .pending + .iter() + .filter(|n| n.urgency >= min_urgency) + .collect(); + + if matching.is_empty() { + return String::new(); + } + + let mut out = format!("Pending notifications ({}):\n", matching.len()); + for n in &matching { + out.push_str(&format!("[{}] {}\n", n.ntype, n.message)); + } + out + } + + /// Ingest notifications from legacy ~/.consciousness/notifications/ files. + /// Maps filename to notification type, assumes NORMAL urgency. + pub fn ingest_legacy_files(&mut self) { + let dir = home().join(".consciousness/notifications"); + let entries = match fs::read_dir(&dir) { + Ok(e) => e, + Err(_) => return, + }; + + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with('.') || name == "state.json" { + continue; + } + let path = entry.path(); + if !path.is_file() { + continue; + } + let content = match fs::read_to_string(&path) { + Ok(c) if !c.is_empty() => c, + _ => continue, + }; + + // Each line is a separate notification + for line in content.lines() { + if !line.is_empty() { + self.submit(name.clone(), NORMAL, line.to_string()); + } + } + + // Clear the file + let _ = fs::write(&path, ""); + } + } +} + +/// What gets persisted to disk (just the type registry, not pending). +#[derive(Serialize, Deserialize)] +struct SavedState { + types: BTreeMap, +} + +/// Format an urgency level as a human-readable string. +pub fn urgency_name(level: u8) -> &'static str { + match level { + 0 => "ambient", + 1 => "low", + 2 => "normal", + 3 => "urgent", + _ => "unknown", + } +} + +/// Parse an urgency level from a string (name or number). +pub fn parse_urgency(s: &str) -> Option { + match s { + "ambient" | "0" => Some(AMBIENT), + "low" | "1" => Some(LOW), + "normal" | "2" => Some(NORMAL), + "urgent" | "3" => Some(URGENT), + _ => None, + } +} diff --git a/src/thalamus/rpc.rs b/src/thalamus/rpc.rs new file mode 100644 index 0000000..b5cc5b0 --- /dev/null +++ b/src/thalamus/rpc.rs @@ -0,0 +1,429 @@ +// Cap'n Proto RPC server implementation. +// +// Bridges the capnp-generated Daemon interface to the idle::State, +// notify::NotifyState, and module state. All state is owned by +// RefCells on the LocalSet — no Send/Sync needed. + +use super::config::Config; +use super::daemon_capnp::daemon; +use super::idle; +use super::modules::{irc, telegram}; +use super::notify; +use capnp::capability::Promise; +use std::cell::RefCell; +use std::rc::Rc; +use tracing::info; + +pub struct DaemonImpl { + state: Rc>, + irc: Option, + telegram: Option, + config: Rc>, +} + +impl DaemonImpl { + pub fn new( + state: Rc>, + irc: Option, + telegram: Option, + config: Rc>, + ) -> Self { + Self { state, irc, telegram, config } + } +} + +impl daemon::Server for DaemonImpl { + fn user( + &mut self, + params: daemon::UserParams, + _results: daemon::UserResults, + ) -> Promise<(), capnp::Error> { + let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); + self.state.borrow_mut().handle_user(&pane); + Promise::ok(()) + } + + fn response( + &mut self, + params: daemon::ResponseParams, + _results: daemon::ResponseResults, + ) -> Promise<(), capnp::Error> { + let pane = pry!(pry!(pry!(params.get()).get_pane()).to_str()).to_string(); + self.state.borrow_mut().handle_response(&pane); + Promise::ok(()) + } + + fn sleep( + &mut self, + params: daemon::SleepParams, + _results: daemon::SleepResults, + ) -> Promise<(), capnp::Error> { + let until = pry!(params.get()).get_until(); + self.state.borrow_mut().handle_sleep(until); + Promise::ok(()) + } + + fn wake( + &mut self, + _params: daemon::WakeParams, + _results: daemon::WakeResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow_mut().handle_wake(); + Promise::ok(()) + } + + fn quiet( + &mut self, + params: daemon::QuietParams, + _results: daemon::QuietResults, + ) -> Promise<(), capnp::Error> { + let secs = pry!(params.get()).get_seconds(); + self.state.borrow_mut().handle_quiet(secs); + Promise::ok(()) + } + + fn consolidating( + &mut self, + _params: daemon::ConsolidatingParams, + _results: daemon::ConsolidatingResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow_mut().consolidating = true; + info!("consolidation started"); + Promise::ok(()) + } + + fn consolidated( + &mut self, + _params: daemon::ConsolidatedParams, + _results: daemon::ConsolidatedResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow_mut().consolidating = false; + info!("consolidation ended"); + Promise::ok(()) + } + + fn dream_start( + &mut self, + _params: daemon::DreamStartParams, + _results: daemon::DreamStartResults, + ) -> Promise<(), capnp::Error> { + let mut s = self.state.borrow_mut(); + s.dreaming = true; + s.dream_start = super::now(); + info!("dream started"); + Promise::ok(()) + } + + fn dream_end( + &mut self, + _params: daemon::DreamEndParams, + _results: daemon::DreamEndResults, + ) -> Promise<(), capnp::Error> { + let mut s = self.state.borrow_mut(); + s.dreaming = false; + s.dream_start = 0.0; + info!("dream ended"); + Promise::ok(()) + } + + fn afk( + &mut self, + _params: daemon::AfkParams, + _results: daemon::AfkResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow_mut().handle_afk(); + Promise::ok(()) + } + + fn test_nudge( + &mut self, + _params: daemon::TestNudgeParams, + mut results: daemon::TestNudgeResults, + ) -> Promise<(), capnp::Error> { + let mut state = self.state.borrow_mut(); + let ctx = state.build_context(true); + let extra = if ctx.is_empty() { + String::new() + } else { + format!("\n{ctx}") + }; + let msg = format!( + "This is your time (User AFK, test nudge). \ + Let your feelings guide your thinking.{extra}" + ); + let ok = state.send(&msg); + results.get().set_sent(ok); + results.get().set_message(&msg); + Promise::ok(()) + } + + fn session_timeout( + &mut self, + params: daemon::SessionTimeoutParams, + _results: daemon::SessionTimeoutResults, + ) -> Promise<(), capnp::Error> { + let secs = pry!(params.get()).get_seconds(); + self.state.borrow_mut().handle_session_timeout(secs); + Promise::ok(()) + } + + fn idle_timeout( + &mut self, + params: daemon::IdleTimeoutParams, + _results: daemon::IdleTimeoutResults, + ) -> Promise<(), capnp::Error> { + let secs = pry!(params.get()).get_seconds(); + self.state.borrow_mut().handle_idle_timeout(secs); + Promise::ok(()) + } + + fn notify_timeout( + &mut self, + params: daemon::NotifyTimeoutParams, + _results: daemon::NotifyTimeoutResults, + ) -> Promise<(), capnp::Error> { + let secs = pry!(params.get()).get_seconds(); + self.state.borrow_mut().handle_notify_timeout(secs); + Promise::ok(()) + } + + fn save( + &mut self, + _params: daemon::SaveParams, + _results: daemon::SaveResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow().save(); + info!("state saved"); + Promise::ok(()) + } + + fn debug( + &mut self, + _params: daemon::DebugParams, + mut results: daemon::DebugResults, + ) -> Promise<(), capnp::Error> { + let json = self.state.borrow().debug_json(); + results.get().set_json(&json); + Promise::ok(()) + } + + fn ewma( + &mut self, + params: daemon::EwmaParams, + mut results: daemon::EwmaResults, + ) -> Promise<(), capnp::Error> { + let value = pry!(params.get()).get_value(); + let current = self.state.borrow_mut().handle_ewma(value); + results.get().set_current(current); + Promise::ok(()) + } + + fn stop( + &mut self, + _params: daemon::StopParams, + _results: daemon::StopResults, + ) -> Promise<(), capnp::Error> { + self.state.borrow_mut().running = false; + info!("stopping"); + Promise::ok(()) + } + + fn status( + &mut self, + _params: daemon::StatusParams, + mut results: daemon::StatusResults, + ) -> Promise<(), capnp::Error> { + let s = self.state.borrow(); + let mut status = results.get().init_status(); + + status.set_last_user_msg(s.last_user_msg); + status.set_last_response(s.last_response); + if let Some(ref pane) = s.claude_pane { + status.set_claude_pane(pane); + } + status.set_sleep_until(match s.sleep_until { + None => 0.0, + Some(0.0) => -1.0, + Some(t) => t, + }); + status.set_quiet_until(s.quiet_until); + status.set_consolidating(s.consolidating); + status.set_dreaming(s.dreaming); + status.set_fired(s.fired); + status.set_user_present(s.user_present()); + status.set_uptime(super::now() - s.start_time); + status.set_activity(match s.notifications.activity { + notify::Activity::Idle => super::daemon_capnp::Activity::Idle, + notify::Activity::Focused => super::daemon_capnp::Activity::Focused, + notify::Activity::Sleeping => super::daemon_capnp::Activity::Sleeping, + }); + status.set_pending_count(s.notifications.pending.len() as u32); + status.set_idle_timeout(s.idle_timeout); + status.set_notify_timeout(s.notify_timeout); + status.set_since_activity(s.since_activity()); + status.set_since_user(super::now() - s.last_user_msg); + status.set_block_reason(s.block_reason()); + status.set_activity_ewma(s.activity_ewma); + + Promise::ok(()) + } + + fn notify( + &mut self, + params: daemon::NotifyParams, + mut results: daemon::NotifyResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let notif = pry!(params.get_notification()); + let ntype = pry!(pry!(notif.get_type()).to_str()).to_string(); + let urgency = notif.get_urgency(); + let message = pry!(pry!(notif.get_message()).to_str()).to_string(); + + let interrupt = self + .state + .borrow_mut() + .notifications + .submit(ntype, urgency, message); + results.get().set_interrupt(interrupt); + Promise::ok(()) + } + + fn get_notifications( + &mut self, + params: daemon::GetNotificationsParams, + mut results: daemon::GetNotificationsResults, + ) -> Promise<(), capnp::Error> { + let min_urgency = pry!(params.get()).get_min_urgency(); + let mut s = self.state.borrow_mut(); + + // Ingest legacy files first + s.notifications.ingest_legacy_files(); + + let pending = if min_urgency == 255 { + s.notifications.drain_deliverable() + } else { + s.notifications.drain(min_urgency) + }; + + let mut list = results.get().init_notifications(pending.len() as u32); + for (i, n) in pending.iter().enumerate() { + let mut entry = list.reborrow().get(i as u32); + entry.set_type(&n.ntype); + entry.set_urgency(n.urgency); + entry.set_message(&n.message); + entry.set_timestamp(n.timestamp); + } + + Promise::ok(()) + } + + fn get_types( + &mut self, + _params: daemon::GetTypesParams, + mut results: daemon::GetTypesResults, + ) -> Promise<(), capnp::Error> { + let s = self.state.borrow(); + let types = &s.notifications.types; + + let mut list = results.get().init_types(types.len() as u32); + for (i, (name, info)) in types.iter().enumerate() { + let mut entry = list.reborrow().get(i as u32); + entry.set_name(name); + entry.set_count(info.count); + entry.set_first_seen(info.first_seen); + entry.set_last_seen(info.last_seen); + entry.set_threshold(info.threshold.map_or(-1, |t| t as i8)); + } + + Promise::ok(()) + } + + fn set_threshold( + &mut self, + params: daemon::SetThresholdParams, + _results: daemon::SetThresholdResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let ntype = pry!(pry!(params.get_type()).to_str()).to_string(); + let level = params.get_level(); + + self.state + .borrow_mut() + .notifications + .set_threshold(&ntype, level); + Promise::ok(()) + } + + fn module_command( + &mut self, + params: daemon::ModuleCommandParams, + mut results: daemon::ModuleCommandResults, + ) -> Promise<(), capnp::Error> { + let params = pry!(params.get()); + let module = pry!(pry!(params.get_module()).to_str()).to_string(); + let command = pry!(pry!(params.get_command()).to_str()).to_string(); + let args_reader = pry!(params.get_args()); + let mut args = Vec::new(); + for i in 0..args_reader.len() { + args.push(pry!(pry!(args_reader.get(i)).to_str()).to_string()); + } + + match module.as_str() { + "irc" => { + let irc = match &self.irc { + Some(irc) => irc.clone(), + None => { + results.get().set_result("irc module not enabled"); + return Promise::ok(()); + } + }; + let config = self.config.clone(); + + Promise::from_future(async move { + let result = irc::handle_command(&irc, &config, &command, &args).await; + match result { + Ok(msg) => results.get().set_result(&msg), + Err(msg) => results.get().set_result(&format!("error: {msg}")), + } + Ok(()) + }) + } + "telegram" => { + let tg = match &self.telegram { + Some(tg) => tg.clone(), + None => { + results.get().set_result("telegram module not enabled"); + return Promise::ok(()); + } + }; + let config = self.config.clone(); + + Promise::from_future(async move { + let result = telegram::handle_command(&tg, &config, &command, &args).await; + match result { + Ok(msg) => results.get().set_result(&msg), + Err(msg) => results.get().set_result(&format!("error: {msg}")), + } + Ok(()) + }) + } + _ => { + results + .get() + .set_result(&format!("unknown module: {module}")); + Promise::ok(()) + } + } + } +} + +/// Helper macro — same as capnp's pry! but available here. +macro_rules! pry { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(e) => return Promise::err(e.into()), + } + }; +} +use pry; diff --git a/src/thalamus/tmux.rs b/src/thalamus/tmux.rs new file mode 100644 index 0000000..f3e0cfd --- /dev/null +++ b/src/thalamus/tmux.rs @@ -0,0 +1,54 @@ +// Tmux interaction: pane detection and prompt injection. + +use std::process::Command; +use std::thread; +use std::time::Duration; +use tracing::info; + +/// Find Claude Code's tmux pane by scanning for the "claude" process. +pub fn find_claude_pane() -> Option { + let out = Command::new("tmux") + .args([ + "list-panes", + "-a", + "-F", + "#{session_name}:#{window_index}.#{pane_index}\t#{pane_current_command}", + ]) + .output() + .ok()?; + + let stdout = String::from_utf8_lossy(&out.stdout); + for line in stdout.lines() { + if let Some((pane, cmd)) = line.split_once('\t') { + if cmd == "claude" { + return Some(pane.to_string()); + } + } + } + None +} + +/// Send a prompt to a tmux pane. Returns true on success. +/// +/// Types the message literally then presses Enter. +pub fn send_prompt(pane: &str, msg: &str) -> bool { + let preview: String = msg.chars().take(100).collect(); + info!("SEND [{pane}]: {preview}..."); + + // Type the message literally (flatten newlines — they'd submit the input early) + let flat: String = msg.chars().map(|c| if c == '\n' { ' ' } else { c }).collect(); + let ok = Command::new("tmux") + .args(["send-keys", "-t", pane, "-l", &flat]) + .output() + .is_ok(); + if !ok { + return false; + } + thread::sleep(Duration::from_millis(500)); + + // Submit + Command::new("tmux") + .args(["send-keys", "-t", pane, "Enter"]) + .output() + .is_ok() +}